hi, 之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑
1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql 就无法显示输出了,为什么? 代码 ... tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO print_sink SELECT user_id ,item_id,category_id ,behavior ,ts,proctime FROM user_behavior"); ... 控制台 3> +I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 聚合逻辑代码(source不变,sink 对应变更列) > String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + > "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as > hour_of_day , COUNT(*) as buy_cnt\n" + > "FROM user_behavior\n" + > "WHERE behavior = 'buy'\n" + > "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; > > > > //注册source和sink > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > // tableResult.print(); > > tEnv.executeSql(transformationDDL); 2.没有太理解您说的 手动拿到那个executeSql的返回的TableResult,然后去 .... wait job finished 代码修改为如下 运行控制台还是没有结果打印 //注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql(transformationDDL); tableResult.getJobClient() .get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader()) .get().wait(); Best, DanielGu -- Sent from: http://apache-flink.147419.n8.nabble.com/