hi, all?? ???????????????????????????? ????????
????DataStream API????kafka??????????????DataStream ds1?? ????????tableEnv????????hive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); ??????ds1??????????table Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = "music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); ????????hive???? CREATE TABLE `dwd_music_copyright_test`( `url` string COMMENT 'url', `md5` string COMMENT 'md5', `utime` bigint COMMENT '????', `title` string COMMENT '??????', `singer` string COMMENT '??????', `company` string COMMENT '????', `level` int COMMENT '??????.0??????????,1??acrcloud??????????,3??????????') PARTITIONED BY ( `dt` string, `hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' TBLPROPERTIES ( 'connector'='HiveCatalog', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='1min', 'sink.rolling-policy.file-size'='1MB'); ??step3??????????????dwd_music_copyright_test ???? flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 ???? ????????????????hive catalog??????????????????????????????????hour=02??hour=03?????? show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00 | | dt=2020-08-29/hour=01 | | dt=2020-08-29/hour=04 | | dt=2020-08-29/hour=05 | ????hdfs?????????????????? $ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K 13.4 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 2.0 K 6.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 1.7 K 5.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 1.3 K 3.8 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 3.1 K 9.2 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 ??????add partition???????????????????? ????flink WebUI??????????????????checkpoint??StreamingFileCommitter?????????????????? ?????? exactly-once??????????sink??????????????????catalog???? ???????????????????????????????? EXACTLY_ONCE??????????????kafka????isolation.level=read_committed??enable.auto.commit=false??????????????????????????????EXACTLY_ONCE?? streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);