??????????Hive??????????: ?? ????????checkpoint ?? ???? wartermark ?? ??????????????,??????????????????..........,????????,????????
------------------ ???????? ------------------ ??????: "user-zh" <liya...@huimin100.cn>; ????????: 2020??8??11??(??????) ????3:19 ??????: "user-zh"<user-zh@flink.apache.org>; ????: flink 1.11 ????sql??????????????hive ?????????????????????? ????hive??????????????TemporaryView??????????????tEnv.executeSql(insertSql)????????????????????????hive???????????????????????????????? StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());//????hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local path String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.createTemporaryView("users", dataStream); Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"); tEnv.toRetractStream(result3, TypeInformation.of(new TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// ????hive???????????????????????????????????? // String hiveSql = "CREATE TABLE fs_table (\n" + // " user_id STRING,\n" + // " order_amount DOUBLE \n" + // ") partitioned by (dt string,h string,m string) \n" + // "stored as textfile \n" + // "TBLPROPERTIES (\n" + // " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + // " 'sink.partition-commit.delay'='0s',\n" + // " 'sink.partition-commit.trigger'='partition-time',\n" + // " 'sink.partition-commit.policy.kind'='metastore'" + // ")"; // tEnv.executeSql(hiveSql); String insertSql = "insert into table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; tEnv.executeSql(insertSql); bsEnv.execute("test"); liya...@huimin100.cn