Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Felipe Gutierrez
Hi community, I am testing the "Split Distinct Aggregation" [1] consuming the taxi ride data set. My sql query from the table environment is the one below: Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, COUNT(driverId) FROM TaxiRide GROUP BY startDate"); and I am enableing: conf

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Felipe Gutierrez
I realized that I forgot the image. Now it is attached. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez wrote: > > Hi community, > > I am testing the "Split Distinct Aggregation" [1] consuming the taxi >

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-09 Thread Jark Wu
Hi Felipe, The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). However, the query in your example is using COUNT(driverId). You can update it to COUNT(DISTINCT driverId), and it should sh

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
Hi Jark, thanks for your reply. Indeed, I forgot to write DISTINCT on the query and now the query plan is splitting into two hash partition phases. what do you mean by deterministic time? Why only the window aggregate is deterministic? If I implement the ProcessingTimeCallback [1] interface is it

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Timo Walther
Hi Felipe, with non-deterministic Jark meant that you never know if the mini batch timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the execution. This depends how fast records arrive at the operator. In general, processing time can be considered non-deterministic, because 1

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
I see, thanks Timo -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 10, 2020 at 3:22 PM Timo Walther wrote: > > Hi Felipe, > > with non-deterministic Jark meant that you never know if the mini batch > timer (every 3 s) or the mini batch thr

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
Hi Jack, I don't get the difference from the "MiniBatch Aggregation" if compared with the "Local-Global Aggregation". On the web page [1] it says that I have to enable the TWO_PHASE parameter. So I compared the query plan from both, with and without the TWO_PHASE parameter. And they are the same.

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Jark Wu
Hi Felipe, The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, if will use TWO-PHASE, otherwise ONE-PHASE. https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy On Thu, 12 Nov 2020 at 17:52, Felipe

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
I see. now it has different query plans. It was documented on another page so I got confused. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, Nov 12, 2020 at 12:41 PM Jark Wu wrote: > > Hi Felipe, > > The default value of `table.optimiz