Re: Periodic full stream aggregations
Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Re: Periodic full stream aggregations
Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Re: Periodic full stream aggregations
Hey, The current code supports 2 types of aggregations, simple binary reduce: T,T=T and also the grouped version for this, where the reduce function is applied per a user defined key (so there we keep a map of reduced values). This can already be used to implement fairly complex logic if we transform the data to a proper type before passing it to the reducer. As a next step we can make this work with fold + combiners as well, where your initial data type is T and your fould function is T,R = R and a combiner is R,R = R. At that point I think any sensible aggregation can be implemented. Regards, Gyula On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: You are right, but you should never try to compute full stream median, thats the point :D On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I read your comments of your PR. I have a question to this comment: It only allows aggregations so we dont need to keep the full history in a buffer. What if the user implements an aggregation function like a median? For a median you need the full history, don't you? Am I missing something? Cheers, Bruno On 21.04.2015 14:31, Gyula Fóra wrote: I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra gyula.f...@gmail.com wrote: Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
Re: Periodic full stream aggregations
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: You are right, but you should never try to compute full stream median, thats the point :D On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I read your comments of your PR. I have a question to this comment: It only allows aggregations so we dont need to keep the full history in a buffer. What if the user implements an aggregation function like a median? For a median you need the full history, don't you? Am I missing something? Cheers, Bruno On 21.04.2015 14:31, Gyula Fóra wrote: I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra gyula.f...@gmail.com wrote: Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8 IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU= =bhGt -END PGP SIGNATURE-
Re: Periodic full stream aggregations
Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Periodic full stream aggregations
Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula