??
insert
into??job
??
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
String sourceDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='10', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='5', " +
" 'fields.f_random_str.length'='10' " +
")";
bsTableEnv.executeSql(sourceDDL);
Table datagen = bsTableEnv.from("datagen");
System.out.println(datagen.getSchema());
String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);
System.out.println(bsTableEnv.from("print_table").getSchema());
Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str),
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by
TUMBLE(ts, INTERVAL '5' second), f_random");
bsTableEnv.executeSql("insert into print_table select * from " + table);