?????? 1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-23 文章 Asahi Lee
??sqljob??




--  --
??: 
   "user-zh"



1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-20 文章 Asahi Lee
??
     insert 
into??job


??
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);

String sourceDDL = "CREATE TABLE datagen (  " +
" f_random INT,  " +
" f_random_str STRING,  " +
" ts AS localtimestamp,  " +
" WATERMARK FOR ts AS ts  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='10',  " +
" 'fields.f_random.min'='1',  " +
" 'fields.f_random.max'='5',  " +
" 'fields.f_random_str.length'='10'  " +
")";

bsTableEnv.executeSql(sourceDDL);
Table datagen = bsTableEnv.from("datagen");

System.out.println(datagen.getSchema());

String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);

System.out.println(bsTableEnv.from("print_table").getSchema());

Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), 
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by 
TUMBLE(ts, INTERVAL '5' second), f_random");
bsTableEnv.executeSql("insert into print_table select * from " + table);