这种情况一般是kafka的某个分区,不存在数据,导致总体的watermark不前进。遇到这种情况一般是需要手动设置idle
source[1]。但是社区的watemark push down存在一些问题[2],已经在修复了。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
[2]
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel


花乞丐 <huaxiapa...@163.com> 于2021年1月18日周一 上午11:42写道:

>
> 代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
> shell中执行命令:hive (ods)> msck repair table
>
> order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教!
> package com.zallsteel.flink.app.log;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import com.google.gson.Gson;
> import com.google.gson.JsonArray;
> import com.google.gson.JsonElement;
> import com.google.gson.JsonParser;
> import com.zallsteel.flink.entity.ChangelogVO;
> import com.zallsteel.flink.entity.OrderInfo;
> import com.zallsteel.flink.utils.ConfigUtils;
>
> import lombok.SneakyThrows;
> import org.apache.commons.lang3.time.FastDateFormat;
> import org.apache.flink.api.common.eventtime.*;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.text.ParseException;
> import java.time.Duration;
> import java.util.Date;
> import java.util.Properties;
>
> /**
>  * @author Jackie Zhu
>  * @time 2021-01-13 16:50:18
>  * @desc 测试MySQLCDC to Hive
>  */
> public class MySQLCDC2HiveApp {
>     public static void main(String[] args) {
>         //获取执行环节
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // 设置并发
>         env.setParallelism(6);
>         //设置checkpoint
>         env.enableCheckpointing(60000);
>         env.getConfig().setAutoWatermarkInterval(200);
>         // 设置Flink SQL环境
>         EnvironmentSettings tableEnvSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         // 创建table Env
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,
> tableEnvSettings);
>         // 设置checkpoint 模型
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>                 CheckpointingMode.EXACTLY_ONCE);
>         // 设置checkpoint间隔
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>                 Duration.ofMinutes(1));
>         // 指定catalog名称
>         String catalogName = "devHive";
>         // 创建HiveCatalog
>         HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
>                 "default",
>                 "/home/beggar/tools/apache-hive-3.1.2-bin/conf",
>                 "/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
>                 "3.1.2"
>         );
>         //注册 Hive Catalog
>         tableEnv.registerCatalog(catalogName,hiveCatalog);
>         //使用hive Catalog
>         tableEnv.useCatalog(catalogName);
>         //创建mysql cdc 数据源
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>         // 创建mysql cdc 数据表
>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
>         tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
>                 "    id BIGINT,\n" +
>                 "    user_id BIGINT,\n" +
>                 "    create_time TIMESTAMP,\n" +
>                 "    operate_time TIMESTAMP,\n" +
>                 "    province_id INT,\n" +
>                 "    order_status STRING,\n" +
>                 "    total_amount DECIMAL(10, 5)\n" +
>                 "  ) WITH (\n" +
>                 "    'connector' = 'mysql-cdc',\n" +
>                 "    'hostname' = 'beggar',\n" +
>                 "    'port' = '3306',\n" +
>                 "    'username' = 'root',\n" +
>                 "    'password' = '123456',\n" +
>                 "    'database-name' = 'cdc',\n" +
>                 "    'table-name' = 'order_info'\n" +
>                 ")");
>         // 创建kafka source
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
>         tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
>                 "id BIGINT,\n" +
>                 "user_id BIGINT,\n" +
>                 "create_time TIMESTAMP,\n" +
>                 "operate_time TIMESTAMP,\n" +
>                 "province_id INT,\n" +
>                 "order_status STRING,\n" +
>                 "total_amount DECIMAL(10, 5)\n" +
>                 ") WITH (\n" +
>                 "'connector' = 'kafka',\n" +
>                 "'topic' = 'order_info',\n" +
>                 "'scan.startup.mode' = 'earliest-offset',\n" +
>                 "'properties.bootstrap.servers' = 'beggar.dev:9092',\n" +
>                 "'format' = 'changelog-json'\n" +
>                 ")");
>         // 向kafka表中插入数据
>         tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
>                 "SELECT id, user_id, create_time,
> operate_time,province_id,order_status,total_amount\n" +
>                 "FROM cdc.order_info");
>         // 自定义带op字段的stream
>         Properties kafkaConfig = ConfigUtils.getKafkaConfig();
>         FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
>                 "order_info",
>                 new SimpleStringSchema(),
>                 kafkaConfig
>         ).setStartFromEarliest();
>         DataStreamSource<String> streamSource = env.addSource(consumer);
>
>
>
>         String[] fieldNames  =
>
> {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};
>
>         TypeInformation[] types =
>
> {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};
>
>         SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
> MapFunction<String, Row>() {
>             @Override
>             public Row map(String value) throws Exception {
>                 Gson gson = new Gson();
>                 ChangelogVO changelogVO = gson.fromJson(value,
> ChangelogVO.class);
>                 String op = changelogVO.getOp();
>                 int arity = fieldNames.length;
>                 Row row = new Row(arity);
>                 row.setField(0, changelogVO.getData().getId());
>                 row.setField(1, changelogVO.getData().getUserId());
>                 row.setField(2, changelogVO.getData().getCreateTime());
>                 row.setField(3, changelogVO.getData().getOperateTime());
>                 row.setField(4, changelogVO.getData().getProviceId());
>                 row.setField(5, changelogVO.getData().getOrderStatus());
>                 row.setField(6, changelogVO.getData().getTotalAmount());
>                 String operation = getOperation(op);
>                 row.setField(7, operation);
>                 return row;
>             }
>
>             private String getOperation(String op) {
>                 String operation = "INSERT";
>                 for (RowKind rk : RowKind.values()) {
>                     if (rk.shortString().equals(op)) {
>                         switch (rk) {
>                             case UPDATE_BEFORE:
>                                 operation = "UPDATE-BEFORE";
>                                 break;
>                             case UPDATE_AFTER:
>                                 operation = "UPDATE-AFTER";
>                                 break;
>                             case DELETE:
>                                 operation = "DELETE";
>                                 break;
>                             case INSERT:
>                             default:
>                                 operation = "INSERT";
>                                 break;
>                         }
>                         break;
>                     }
>                 }
>                 return operation;
>             }
>         }, new RowTypeInfo(types, fieldNames));
>         // 设置水印
>
>
> ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)));
>         tableEnv.createTemporaryView("merged_order_info", ds2);
>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
>         tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
>                 "  id BIGINT,\n" +
>                 "   user_id BIGINT,\n" +
>                 "   create_time STRING,\n" +
>                 "   operate_time STRING,\n" +
>                 "   province_id INT,\n" +
>                 "   order_status INT,\n" +
>                 "   total_amount DOUBLE,\n" +
>                 "   op STRING \n" +
>                 ") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED
> AS parquet TBLPROPERTIES (\n" +
>                 "  'partition.time-extractor.timestamp-pattern'='$dt
> $hr:$sec:00',\n" +
>                 "  'sink.partition-commit.trigger'='partition-time',\n" +
>                 "  'sink.partition-commit.delay'='1 min',\n" +
>                 "
> 'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
>                 ")");
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         TableResult tableResult = tableEnv.executeSql("INSERT INTO
> ods.order_info\n" +
>                 "SELECT \n" +
>                 "id,\n" +
>                 "user_id,\n" +
>                 "create_time,\n" +
>                 "operate_time,\n" +
>                 "province_id,\n" +
>                 "order_status,\n" +
>                 "total_amount,\n" +
>                 "op,\n" +
>                 "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
>                 "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'HH') as hr,\n" +
>                 "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'mm') as sec\n" +
>                 "FROM merged_order_info"
>         );
>         try {
>             tableEnv.execute("mysqlcdc to hive");
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>
>     }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复