可否发下是哪个配置,有相关的文档吗?

superainbower <superainbo...@163.com> 于2020年9月4日周五 下午5:24写道:

> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian<taochangl...@163.com> 写道:
> 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
>
> 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
> hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
>
> 在 2020/9/4 13:14, Benchao Li 写道:
> 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
> 要处理这种情况,可以了解下idle source[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> samuel....@ubtrobot.com <samuel....@ubtrobot.com> 于2020年9月3日周四 下午3:41写道:
>
> 补充一下环境信息:
>
> 有点类似以下问题:
> 在1.11版本测试flink sql时发现一个问题,用streaming api
> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>
> 不确定是否是因为kafka多分区引起的?
>
>
>
> 发件人: samuel....@ubtrobot.com
> 发送时间: 2020-09-03 09:23
> 收件人: user-zh
> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
> 谢谢回复!
>
> 是Flink1.11.1的版本
>
> 以下是代码:
> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> *     http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> import org.apache.commons.collections.map.HashedMap;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.BroadcastState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import org.apache.flink.streaming.api.functions.co
> .BroadcastProcessFunction;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import
>
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONArray;
> import com.alibaba.fastjson.JSONObject;
> import com.alibaba.fastjson.parser.Feature;
> import com.ubtechinc.dataplatform.flink.util.AES256;
> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
> import com.ubtechinc.dataplatform.flink.util.MailUtils;
> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import com.mysql.jdbc.Connection;
>
> import java.sql.Timestamp;
> import java.text.MessageFormat;
> import java.time.Duration;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
> * 使用广播实现动态的配置更新
> */
> public class ExceptionAlertHour4{
>
> private static final Logger LOG =
> LoggerFactory.getLogger(ExceptionAlertHour4.class);
>
> public static void main(String[] args) throws Exception{
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(parameterTool);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //System.out.println(parameterTool.get("envFlag"));
> Properties properties = new Properties();
> String hdfs = null;
> long maxTime = 0;
> long maxSize = 0;
> long inActive = 0;
> DataStream<String> ds =null;
>
> if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_TEST;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_TEST);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_TEST);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour4-001");
> Map<KafkaTopicPartition, Long> offsets = new HashedMap();
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0),
> 2800L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1),
> 2700L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2),
> 3300L);
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
> } else if
> (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_PRODUCT);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour-001");
> properties.setProperty("auto.offset.reset", "earliest");
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties));
> } else {
> System.exit(-1);
> }
> // transform
> SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS =
> ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>()
> {
>
> @Override
> public void flatMap(String value,
> Collector<Tuple4<String,String,String,Long>> out) {
>
> //System.out.println("Kafka2Hdfs-in:" + value);
> String newStr =
> value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t",
> "<brbr>");
> //System.out.println("Kafka2Hdfs-newStr:" + newStr);
>
> try {
> // 解析JSON数据
> JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);
>
> // 获取最新的字段值
> JSONArray bodyDataArray = record.getJSONArray("body_data");
> // 遍历,字段值的JSON数组,只有一个元素
> for (int i = 0; i < bodyDataArray.size(); i++) {
> // 获取到JSON数组的第i个元素
> JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);
>
> if (bodyDataObj != null) {
> Tuple4 log = Tuple4.of(
> record.getString("HW-AppId"),
> bodyDataObj.getString("HW-bugId"),
> bodyDataObj.getString("HW-bugType"),
> Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
> );
> out.collect(log);
> }
> }
>
> } catch (Exception e) {
> System.out.println(e.getMessage());
> }
> }
>
> });
> singleDS.print();
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long>>
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
>
> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))
> .withTimestampAssigner((event, timestamp)->event.f3));
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
>
> String sql = "select  appid,eventid,cnt," +
> "(starttime + interval '8' hour ) as stime," +
> "(endtime + interval '8' hour ) as etime  " +
> "from (select appid,eventid,count(*) as cnt," +
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
> "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
> HOUR),TIME '00:00:00')";
>
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
> dataStream.print();
>
> env.execute("etl.exception.monitor.ExceptionAlertHour");
> }
>
>
>
> public static class Result{
> private String appid;
> private String eventid;
> private long cnt;
> private Timestamp stime;
> private Timestamp etime;
> public String getAppid() {
> return appid;
> }
>
> public void setAppid(String appid) {
> this.appid = appid;
> }
>
> public String getEventid() {
> return eventid;
> }
>
> public void setEventid(String eventid) {
> this.eventid = eventid;
> }
>
> public long getCnt() {
> return cnt;
> }
>
> public void setCnt(long cnt) {
> this.cnt = cnt;
> }
>
>
> public Timestamp getStime(){
> return stime;
> }
>
> public void setStime(Timestamp stime){
> this.stime = stime;
> }
>
> public Timestamp getEtime(){
> return etime;
> }
>
> public void setEtime(Timestamp etime){
> this.etime = etime;
> }
>
> @Override
> public String toString(){
> return "ResultHour{" +
> "appid=" + appid +
> ",eventid=" + eventid +
> ",cnt=" + cnt +
> ", stime=" + stime +
> ", etime=" + etime +
> ", SystemTime=" + System.currentTimeMillis() +
> '}';
> }
> }
>
> }
>
>
> 发件人: jacky-cui
> 发送时间: 2020-09-02 18:58
> 收件人: user-zh
> 主题: 回复:请指教一个关于时间窗的问题,非常感谢!
> 你这个flink是什么版本,能贴全代码吗
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
> "user-zh"
> <
> samuel....@ubtrobot.com&gt;;
> 发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
>
>
>
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long&gt;&gt;
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
>
> .<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)-&gt;event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(starttime + interval '8' hour ) as stime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(endtime + interval '8' hour ) as etime&nbsp;"
> +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> (select appid,eventid,count(*) as cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
> '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result&gt; dataStream = tenv.toAppendStream(table,
> Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
> 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
> //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
>
>
>

回复