????????????????????????????????????1. ????package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DataGenTest {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String sourceTableDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='20', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='10', " +
" 'fields.f_random_str.length'='10' " +
")";
bsTableEnv.executeSql(sourceTableDDL);
bsTableEnv.executeSql("SELECT f_random, count(1) " +
"FROM datagen " +
"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
}
}2. ????????log4j:WARN No appenders could be found for logger
(org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the
log4j system properly. log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
+-------------+----------------------+ | f_random | EXPR$1 |
+-------------+----------------------+