????????????????????????????????????1. ????package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataGenTest {

    public static void main(String[] args) {

        StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

        String sourceTableDDL = "CREATE TABLE datagen ( " +
                " f_random INT, " +
                " f_random_str STRING, " +
                " ts AS localtimestamp, " +
                " WATERMARK FOR ts AS ts " +
                ") WITH ( " +
                " 'connector' = 'datagen', " +
                " 'rows-per-second'='20', " +
                " 'fields.f_random.min'='1', " +
                " 'fields.f_random.max'='10', " +
                " 'fields.f_random_str.length'='10' " +
                ")";

        bsTableEnv.executeSql(sourceTableDDL);

        bsTableEnv.executeSql("SELECT f_random, count(1) " +
                "FROM datagen " +
                "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

    }

}2. ????????log4j:WARN No appenders could be found for logger 
(org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the 
log4j system properly. log4j:WARN See 
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
+-------------+----------------------+ |    f_random |               EXPR$1 | 
+-------------+----------------------+

回复