?????? ??????????????????StreamTableEnvironment.from("")???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ??????????????package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Test3 { public static void main(String[] args) { // ???? StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // ?????????? String inTablePath = "CREATE TABLE datagen ( " + " id INT, " + " total string, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.id.min'='1', " + " 'fields.id.max'='10', " + " 'fields.total.length'='10' " + ")"; // ?????????? bsTableEnv.executeSql(inTablePath); Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from datagen"); bsTableEnv.createTemporaryView("table1", table); Table table1 = bsTableEnv.from("table1"); System.out.println(table1); // ??????????????????????????????table1???????????????????? Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from table1"); System.out.println(queryT.getSchema()); bsTableEnv.sqlQuery("select table1.id from " + bsTableEnv.from("table1")); } }