????flink sql 1.12 ????????????minibatch??????
val config = tConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") //  mini-batch is 
enabled
config.setString("table.exec.mini-batch.allow-latency", "true") 
config.setString("table.exec.mini-batch.size", 100) 
config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable 
two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not 
support user defined AggregateFunctionsql??tableEnv.executeSql(
  s"""insert into event_test
     |select
     |      date_format(create_time,'yyyyMMdd') dt,
     |      uid,
     |      count(distinct fid) text_feed_count,
     |      max(event_time) event_time
     |    from basic_fd_base where ftype <&gt;'0' and uid is not null
     |      group by
     |       date_format(create_time,'yyyyMMdd'),
     |       uid
""".stripMargin).print()event_test ????print connectorbasic_fd_base ????kafka 
connector????????????timestamp????????????????????eventtime????????????null????????????????????????????????????null??2&gt;
 +I(20210427,747873111868904192,1,2021-04-27T14:03:10)
2&gt; +I(20210427,709531067945685120,1,null)
2&gt; +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923)
2&gt; +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553)
2&gt; +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305)
2&gt; +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061)
2&gt; +I(20210427,xishuashu...@sohu.com,1,2021-04-27T14:05:37)
2&gt; +I(20210427,759219266892539008,1,null)
2&gt; +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184)
2&gt; -U(20210427,709531067945685120,1,null)
2&gt; +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156)
2&gt; +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133)
2&gt; -U(20210427,759219266892539008,1,null)
2&gt; +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692)
2&gt; +I(20210427,745540385069273984,1,2021-04-27T14:23:34)
2&gt; +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870)
2&gt; +I(20210427,714590484395398016,1,2021-04-27T14:19:06)
2&gt; +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864)
2&gt; +I(20210427,746212052309316608,1,null)
2&gt; +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743)
2&gt; +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396)
2&gt; +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053)
2&gt; +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193)
2&gt; -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212)
2&gt; +U(20210427,758388594254750720,4,2021-04-27T14:14:55)
2&gt; +I(20210427,759466217777079296,1,2021-04-27T14:25:59.019)
2&gt; -U(20210427,762769243539450496,1,2021-04-27T14:04:29)
2&gt; +U(20210427,762769243539450496,2,2021-04-27T14:04:29)
2&gt; +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680)
2&gt; +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621)
2&gt; +I(20210427,713108045701517952,1,null)
??????minibatch????????????????????????????????????????????????????????????????null??????????????????????????????

回复