Hey, The current code supports 2 types of aggregations, simple binary reduce: T,T=>T and also the grouped version for this, where the reduce function is applied per a user defined key (so there we keep a map of reduced values). This can already be used to implement fairly complex logic if we transform the data to a proper type before passing it to the reducer.
As a next step we can make this work with fold + combiners as well, where your initial data type is T and your fould function is T,R => R and a combiner is R,R => R. At that point I think any sensible aggregation can be implemented. Regards, Gyula On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna < [email protected]> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > Hi Gyula, > > fair enough! > > I used a bad example. > > What I really wanted to know is whether your code supports only > aggregation like sum, min, and max where you need to pass only a value > to the next aggregation or also more complex data structures, e.g., a > synopsis of the full stream, to compute an aggregation such as an > approximate count distinct (item count)? > > Cheers, > Bruno > > On 21.04.2015 15:18, Gyula Fóra wrote: > > You are right, but you should never try to compute full stream > > median, thats the point :D > > > > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < > > [email protected]> wrote: > > > > Hi Gyula, > > > > I read your comments of your PR. > > > > I have a question to this comment: > > > > "It only allows aggregations so we dont need to keep the full > > history in a buffer." > > > > What if the user implements an aggregation function like a median? > > > > For a median you need the full history, don't you? > > > > Am I missing something? > > > > Cheers, Bruno > > > > On 21.04.2015 14:31, Gyula Fóra wrote: > >>>> I have opened a PR for this feature: > >>>> > >>>> https://github.com/apache/flink/pull/614 > >>>> > >>>> Cheers, Gyula > >>>> > >>>> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra > >>>> <[email protected]> wrote: > >>>> > >>>>> Thats a good idea, I will modify my PR to that :) > >>>>> > >>>>> Gyula > >>>>> > >>>>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske > >>>>> <[email protected]> wrote: > >>>>> > >>>>>> Is it possible to switch the order of the statements, > >>>>>> i.e., > >>>>>> > >>>>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of > >>>>>> dataStream.reduce(...).every(Time.of(4,sec)) > >>>>>> > >>>>>> I think that would be more consistent with the structure > >>>>>> of the remaining API. > >>>>>> > >>>>>> Cheers, Fabian > >>>>>> > >>>>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra > >>>>>> <[email protected]>: > >>>>>> > >>>>>>> Hi Bruno, > >>>>>>> > >>>>>>> Of course you can do that as well. (That's the good > >>>>>>> part :p ) > >>>>>>> > >>>>>>> I will open a PR soon with the proposed changes (first > >>>>>>> without breaking > >>>>>> the > >>>>>>> current Api) and I will post it here. > >>>>>>> > >>>>>>> Cheers, Gyula > >>>>>>> > >>>>>>> On Tuesday, April 21, 2015, Bruno Cadonna < > >>>>>> [email protected] > >>>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>> Hi Gyula, > >>>> > >>>> I have a question regarding your suggestion. > >>>> > >>>> Can the current continuous aggregation be also specified with > >>>> your proposed periodic aggregation? > >>>> > >>>> I am thinking about something like > >>>> > >>>> dataStream.reduce(...).every(Count.of(1)) > >>>> > >>>> Cheers, Bruno > >>>> > >>>> On 20.04.2015 22:32, Gyula Fóra wrote: > >>>>>>>>>> Hey all, > >>>>>>>>>> > >>>>>>>>>> I think we are missing a quite useful feature > >>>>>>>>>> that could be implemented (with some slight > >>>>>>>>>> modifications) on top of the current windowing > >>>>>>>>>> api. > >>>>>>>>>> > >>>>>>>>>> We currently provide 2 ways of aggregating (or > >>>>>>>>>> reducing) over streams: doing a continuous > >>>>>>>>>> aggregation and always output the aggregated > >>>>>>>>>> value (which cannot be done properly in parallel) > >>>>>>>>>> or doing aggregation in a window periodically. > >>>>>>>>>> > >>>>>>>>>> What we don't have at the moment is periodic > >>>>>>>>>> aggregations on the whole stream. I would even go > >>>>>>>>>> as far as to remove the continuous outputting > >>>>>>>>>> reduce/aggregate it and replace it with this > >>>>>>>>>> version as this in return can be done properly in > >>>>>>>>>> parallel. > >>>>>>>>>> > >>>>>>>>>> My suggestion would be that a call: > >>>>>>>>>> > >>>>>>>>>> dataStream.reduce(..) dataStream.sum(..) > >>>>>>>>>> > >>>>>>>>>> would return a windowed data stream where the > >>>>>>>>>> window is the whole record history, and the user > >>>>>>>>>> would need to define a trigger to get the actual > >>>>>>>>>> reduced values like: > >>>>>>>>>> > >>>>>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to > >>>>>>>>>> get the actual reduced results. > >>>>>>>>>> dataStream.sum(...).every(...) > >>>>>>>>>> > >>>>>>>>>> I think the current data stream > >>>>>>>>>> reduce/aggregation is very confusing without > >>>>>>>>>> being practical for any normal use-case. > >>>>>>>>>> > >>>>>>>>>> Also this would be a very api breaking change > >>>>>>>>>> (but I would still make this change as it is much > >>>>>>>>>> more intuitive than the current behaviour) so I > >>>>>>>>>> would try to push it before the release if we can > >>>>>>>>>> agree. > >>>>>>>>>> > >>>>>>>>>> Cheers, Gyula > >>>>>>>>>> > >>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > > > >> > > > > - -- > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > Dr. Bruno Cadonna > Postdoctoral Researcher > > Databases and Information Systems > Department of Computer Science > Humboldt-Universität zu Berlin > > http://www.informatik.hu-berlin.de/~cadonnab > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1 > > iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d > jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8 > IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC > kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq > gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp > jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU= > =bhGt > -----END PGP SIGNATURE----- >
