??????????Hive??????????:
?? ????????checkpoint
?? ???? wartermark
?? ??????????????,??????????????????..........,????????,????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<liya...@huimin100.cn&gt;;
????????:&nbsp;2020??8??11??(??????) ????3:19
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;flink 1.11 ????sql??????????????hive



??????????????????????&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
????hive??????????????TemporaryView??????????????tEnv.executeSql(insertSql)????????????????????????hive????????????????????????????????

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);DataStream<UserInfo&gt; dataStream = bsEnv.addSource(new 
MySource());//????hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local 
path
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

tEnv.createTemporaryView("users", dataStream);

Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 
'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM 
users");


tEnv.toRetractStream(result3, TypeInformation.of(new 
TypeHint<Tuple5<String,Double,String,String,String&gt;&gt;(){})).print("res");//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ????hive????????????????????????????????????
//&nbsp;&nbsp;&nbsp; String hiveSql = "CREATE TABLE fs_table (\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; user_id STRING,\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; order_amount DOUBLE \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ") partitioned by (dt string,h string,m string) \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "stored as textfile \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "TBLPROPERTIES (\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; 'sink.partition-commit.delay'='0s',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; 'sink.partition-commit.trigger'='partition-time',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; 'sink.partition-commit.policy.kind'='metastore'" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ")";
//&nbsp;&nbsp;&nbsp; tEnv.executeSql(hiveSql);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String insertSql = "insert into 
table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 
'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tEnv.executeSql(insertSql);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsEnv.execute("test");


liya...@huimin100.cn

回复