失败的图没有呢。。具体什么异常? On Mon, Sep 7, 2020 at 10:23 AM MuChen <9329...@qq.com> wrote:
> hi, all: > 麻烦大佬们帮看个问题,多谢! > > 处理逻辑如下 > 1. 使用DataStream API读取kafka中的数据,写入DataStream ds1中 > 2. 新建一个tableEnv,并注册hive catalog: > tableEnv.registerCatalog(catalogName, catalog); > tableEnv.useCatalog(catalogName); > 3. 声明以ds1为数据源的table > Table sourcetable = tableEnv.fromDataStream(ds1); > String souceTableName = "music_source"; > tableEnv.createTemporaryView(souceTableName, sourcetable); > 4. 创建一张hive表: > > CREATE TABLE `dwd_music_copyright_test`( > `url` string COMMENT 'url', > `md5` string COMMENT 'md5', > `utime` bigint COMMENT '时间', > `title` string COMMENT '歌曲名', > `singer` string COMMENT '演唱者', > `company` string COMMENT '公司', > `level` int COMMENT '置信度.0是标题切词,1是acrcloud返回的结果,3是人工标准') > PARTITIONED BY ( > `dt` string, > `hour` string)ROW FORMAT SERDE > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS > INPUTFORMAT > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' > LOCATION > 'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' > TBLPROPERTIES ( > 'connector'='HiveCatalog', > 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', > 'sink.partition-commit.delay'='1 min', > 'sink.partition-commit.policy.kind'='metastore,success-file', > 'sink.partition-commit.trigger'='partition-time', > 'sink.rolling-policy.check-interval'='30s', > 'sink.rolling-policy.rollover-interval'='1min', > 'sink.rolling-policy.file-size'='1MB'); > > > 5. 将step3表中的数据插入dwd_music_copyright_test > > 环境 > > flink:1.11 > kafka:1.1.1 > hadoop:2.6.0 > hive:1.2.0 > > 问题 > 程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区: > > show partitions rt_dwd.dwd_music_copyright_test; > > | dt=2020-08-29/hour=00 | > | dt=2020-08-29/hour=01 | > | dt=2020-08-29/hour=04 | > | dt=2020-08-29/hour=05 | > > 但是hdfs目录下有文件生成: > > $ hadoop fs -du -h > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/4.5 K > 13.4 K > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=002.0 > K 6.1 K > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=011.7 > K 5.1 K > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=021.3 > K 3.8 K > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=033.1 > K 9.2 K > /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 > > > 且手动add partition后可以正常读取数据。 > > 通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生: > > > > > > 请问: > > 1. exactly-once只能保证写sink文件,不能保证更新catalog吗? > 2. 是的话有什么方案解决这个问题吗? > 3. > EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE? > > streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > > -- Best, Jingsong Lee