??sql????????????????????????????????job??????????????
------------------ ???????? ------------------ ??????: "user-zh" <zoud...@163.com>; ????????: 2020??8??23??(??????) ????2:29 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: 1.11??????????TableEnvironment.executeSql("insert into ...")??job?????????????? ?????????????????????????????????? jobName > 2020??8??21?? ????11:11??Asahi Lee <978466...@qq.com> ?????? > > ?????? > &nbsp; &nbsp; &nbsp;????????????????insert into??????????????????????????????job???????? > > > ?????? > EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > String sourceDDL = "CREATE TABLE datagen ( " + > " f_random INT, " + > " f_random_str STRING, " + > " ts AS localtimestamp, " + > " WATERMARK FOR ts AS ts " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='10', " + > " 'fields.f_random.min'='1', " + > " 'fields.f_random.max'='5', " + > " 'fields.f_random_str.length'='10' " + > ")"; > > bsTableEnv.executeSql(sourceDDL); > Table datagen = bsTableEnv.from("datagen"); > > System.out.println(datagen.getSchema()); > > String sinkDDL = "CREATE TABLE print_table (" + > " f_random int," + > " c_val bigint, " + > " wStart TIMESTAMP(3) " + > ") WITH ('connector' = 'print') "; > bsTableEnv.executeSql(sinkDDL); > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); > bsTableEnv.executeSql("insert into print_table select * from " + table);