??sql????????????????????????????????job??????????????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<zoud...@163.com&gt;;
????????:&nbsp;2020??8??23??(??????) ????2:29
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: 1.11??????????TableEnvironment.executeSql("insert into 
...")??job??????????????



?????????????????????????????????? jobName

&gt; 2020??8??21?? ????11:11??Asahi Lee <978466...@qq.com&gt; ??????
&gt; 
&gt; ??????
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp;????????????????insert 
into??????????????????????????????job????????
&gt; 
&gt; 
&gt; ??????
&gt; EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
&gt; TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
&gt; 
&gt; String sourceDDL = "CREATE TABLE datagen (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " f_random INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " f_random_str STRING,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " ts AS localtimestamp,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " WATERMARK FOR ts AS ts&nbsp; " 
+
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'datagen',&nbsp; 
" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'rows-per-second'='10',&nbsp; 
" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 
'fields.f_random.min'='1',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 
'fields.f_random.max'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 
'fields.f_random_str.length'='10'&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
&gt; 
&gt; bsTableEnv.executeSql(sourceDDL);
&gt; Table datagen = bsTableEnv.from("datagen");
&gt; 
&gt; System.out.println(datagen.getSchema());
&gt; 
&gt; String sinkDDL = "CREATE TABLE print_table (" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " f_random int," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " c_val bigint, " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " wStart TIMESTAMP(3) " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH ('connector' = 'print') 
";
&gt; bsTableEnv.executeSql(sinkDDL);
&gt; 
&gt; System.out.println(bsTableEnv.from("print_table").getSchema());
&gt; 
&gt; Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), 
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by 
TUMBLE(ts, INTERVAL '5' second), f_random");
&gt; bsTableEnv.executeSql("insert into print_table select * from " + table);

回复