I see, thanks Timo

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <twal...@apache.org> wrote:
>
> Hi Felipe,
>
> with non-deterministic Jark meant that you never know if the mini batch
> timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> execution. This depends how fast records arrive at the operator.
>
> In general, processing time can be considered non-deterministic, because
> 100ms must not be 100ms. This depends on the CPU load and other tasks
> such garbage collection etc. Only event-time (and thus event time
> windows) that work on the timestamp in the data instead of machine time
> is determistic,
>
> Regards,
> Timo
>
>
> On 10.11.20 12:02, Felipe Gutierrez wrote:
> > Hi Jark,
> >
> > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > and now the query plan is splitting into two hash partition phases.
> >
> > what do you mean by deterministic time? Why only the window aggregate
> > is deterministic? If I implement the ProcessingTimeCallback [1]
> > interface is it deterministic?
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > Thanks
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote:
> >>
> >> Hi Felipe,
> >>
> >> The "Split Distinct Aggregation", i.e. the 
> >> "table.optimizer.distinct-agg.split.enabled" option,
> >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> >>
> >> However, the query in your example is using COUNT(driverId).
> >> You can update it to COUNT(DISTINCT driverId), and it should show two hash 
> >> phases.
> >>
> >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time 
> >> window aggregation.
> >>
> >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers 
> >> some input records in memory
> >>   and processes them together to reduce the state accessing. But 
> >> processing-time window is still a per-record
> >>   state accessing style. Besides, the local aggregation also applies 
> >> mini-batch, it only sends the accumulator
> >>   of current this mini-batch to the downstream global aggregation, and 
> >> this improves performance a lot.
> >> 2) The size of MiniBach is not deterministic. It may be triggered by the 
> >> number of records or a timeout.
> >>    But a window aggregate is triggered by a deterministic time.
> >>
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez 
> >> <felipe.o.gutier...@gmail.com> wrote:
> >>>
> >>> I realized that I forgot the image. Now it is attached.
> >>> --
> >>> -- Felipe Gutierrez
> >>> -- skype: felipe.o.gutierrez
> >>> -- https://felipeogutierrez.blogspot.com
> >>>
> >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> >>> <felipe.o.gutier...@gmail.com> wrote:
> >>>>
> >>>> Hi community,
> >>>>
> >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> >>>> ride data set. My sql query from the table environment is the one
> >>>> below:
> >>>>
> >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> >>>>
> >>>> and I am enableing:
> >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> >>>> configuration.setString("table.optimizer.agg-phase-strategy", 
> >>>> "TWO_PHASE");
> >>>> and finally
> >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", 
> >>>> "true");
> >>>>
> >>>> I was expecting that the query plan at the WEB UI show to me two hash
> >>>> phases as it is present here on the image [1]. Instead, it is showing
> >>>> to me the same plan with one hash phase as I was deploying only one
> >>>> Local aggregate and one Global aggregate (of course, taking the
> >>>> parallel instances into consideration). Please see the query execution
> >>>> plan image attached.
> >>>>
> >>>> Is there something that I am missing when I config the Table API?
> >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> >>>> on the operator after the hash phase? If it is, isn't it the same as a
> >>>> window aggregation instead of an unbounded window as the example
> >>>> presents?
> >>>>
> >>>> Thanks!
> >>>> Felipe
> >>>>
> >>>> [1] 
> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> >>>> [2] 
> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> >>>> --
> >>>> -- Felipe Gutierrez
> >>>> -- skype: felipe.o.gutierrez
> >>>> -- https://felipeogutierrez.blogspot.com
> >
>

Reply via email to