I see, thanks Timo -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <twal...@apache.org> wrote: > > Hi Felipe, > > with non-deterministic Jark meant that you never know if the mini batch > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the > execution. This depends how fast records arrive at the operator. > > In general, processing time can be considered non-deterministic, because > 100ms must not be 100ms. This depends on the CPU load and other tasks > such garbage collection etc. Only event-time (and thus event time > windows) that work on the timestamp in the data instead of machine time > is determistic, > > Regards, > Timo > > > On 10.11.20 12:02, Felipe Gutierrez wrote: > > Hi Jark, > > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query > > and now the query plan is splitting into two hash partition phases. > > > > what do you mean by deterministic time? Why only the window aggregate > > is deterministic? If I implement the ProcessingTimeCallback [1] > > interface is it deterministic? > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html > > Thanks > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote: > >> > >> Hi Felipe, > >> > >> The "Split Distinct Aggregation", i.e. the > >> "table.optimizer.distinct-agg.split.enabled" option, > >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). > >> > >> However, the query in your example is using COUNT(driverId). > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash > >> phases. > >> > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time > >> window aggregation. > >> > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers > >> some input records in memory > >> and processes them together to reduce the state accessing. But > >> processing-time window is still a per-record > >> state accessing style. Besides, the local aggregation also applies > >> mini-batch, it only sends the accumulator > >> of current this mini-batch to the downstream global aggregation, and > >> this improves performance a lot. > >> 2) The size of MiniBach is not deterministic. It may be triggered by the > >> number of records or a timeout. > >> But a window aggregate is triggered by a deterministic time. > >> > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez > >> <felipe.o.gutier...@gmail.com> wrote: > >>> > >>> I realized that I forgot the image. Now it is attached. > >>> -- > >>> -- Felipe Gutierrez > >>> -- skype: felipe.o.gutierrez > >>> -- https://felipeogutierrez.blogspot.com > >>> > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez > >>> <felipe.o.gutier...@gmail.com> wrote: > >>>> > >>>> Hi community, > >>>> > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi > >>>> ride data set. My sql query from the table environment is the one > >>>> below: > >>>> > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); > >>>> > >>>> and I am enableing: > >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); > >>>> configuration.setString("table.exec.mini-batch.size", "5000"); > >>>> configuration.setString("table.optimizer.agg-phase-strategy", > >>>> "TWO_PHASE"); > >>>> and finally > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", > >>>> "true"); > >>>> > >>>> I was expecting that the query plan at the WEB UI show to me two hash > >>>> phases as it is present here on the image [1]. Instead, it is showing > >>>> to me the same plan with one hash phase as I was deploying only one > >>>> Local aggregate and one Global aggregate (of course, taking the > >>>> parallel instances into consideration). Please see the query execution > >>>> plan image attached. > >>>> > >>>> Is there something that I am missing when I config the Table API? > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window > >>>> on the operator after the hash phase? If it is, isn't it the same as a > >>>> window aggregation instead of an unbounded window as the example > >>>> presents? > >>>> > >>>> Thanks! > >>>> Felipe > >>>> > >>>> [1] > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > >>>> [2] > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > >>>> -- > >>>> -- Felipe Gutierrez > >>>> -- skype: felipe.o.gutierrez > >>>> -- https://felipeogutierrez.blogspot.com > > >