hi, all??
????????????????????????????

????????

????DataStream API????kafka??????????????DataStream ds1??

????????tableEnv????????hive catalog??
tableEnv.registerCatalog(catalogName, catalog); 
tableEnv.useCatalog(catalogName); 
??????ds1??????????table
Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = 
"music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); 
????????hive????
CREATE TABLE `dwd_music_copyright_test`(   `url` string COMMENT 'url',   `md5` 
string COMMENT 'md5',   `utime` bigint COMMENT '????',   `title` string COMMENT 
'??????',   `singer` string COMMENT '??????',   `company` string COMMENT 
'????',   `level` int COMMENT 
'??????.0??????????,1??acrcloud??????????,3??????????') PARTITIONED BY (   `dt` 
string,   `hour` string) ROW FORMAT SERDE   
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS 
INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION   
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' 
TBLPROPERTIES (   'connector'='HiveCatalog',   
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',   
'sink.partition-commit.delay'='1 min',   
'sink.partition-commit.policy.kind'='metastore,success-file',   
'sink.partition-commit.trigger'='partition-time',   
'sink.rolling-policy.check-interval'='30s',   
'sink.rolling-policy.rollover-interval'='1min',   
'sink.rolling-policy.file-size'='1MB'); 
??step3??????????????dwd_music_copyright_test

????
flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 
????

????????????????hive 
catalog??????????????????????????????????hour=02??hour=03??????
show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00  | | 
dt=2020-08-29/hour=01  | | dt=2020-08-29/hour=04  | | dt=2020-08-29/hour=05  | 
????hdfs??????????????????
$ hadoop fs -du -h 
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K   
13.4 K  
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 
2.0 K   6.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 
1.7 K   5.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 
1.3 K   3.8 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 
3.1 K   9.2 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 
??????add partition????????????????????

????flink 
WebUI??????????????????checkpoint??StreamingFileCommitter??????????????????





??????

exactly-once??????????sink??????????????????catalog????

????????????????????????????????

EXACTLY_ONCE??????????????kafka????isolation.level=read_committed??enable.auto.commit=false??????????????????????????????EXACTLY_ONCE??
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

回复