EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        Map<String,String&gt; map = new HashMap<&gt;();
        map.put("table.exec.source.idle-timeout","1000 ms");
        
executionEnvironment.getConfig().setGlobalJobParameters(ParameterTool.fromMap(map));
        
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(executionEnvironment, settings);
        streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));
        String catalogName  = "cat1";
        streamTableEnv.registerCatalog(catalogName,new 
GenericInMemoryCatalog(catalogName,"db1"));
        streamTableEnv.useCatalog(catalogName);
        streamTableEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
                        "  user_id STRING,\n" +
                        "  order_amount DOUBLE,\n" +
                        "  log_ts TIMESTAMP(3),\n" + 
                        "  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' 
SECOND\n"+
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'tp1',\n" +
                "  'properties.bootstrap.servers' = 'host1:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" + 
                "  'format' = 'csv'\n" +
                ")");
        streamTableEnv.executeSql("select user_id," +
                "sum(order_amount) as amt," +
                "tumble_start(log_ts,INTERVAL '5' SECOND) as tumbleStart,"+
                "tumble_end(log_ts,INTERVAL '5' SECOND) as tumbleEnd " +
                "from kafka_table2 group by user_id,tumble(log_ts,INTERVAL '5' 
SECOND)").print();
kafka ??????????, ??????????????????????????, ????3??????????????????????,?????

回复