You should have a look at this project : https://github.com/addthis/stream-lib
You can use it within Flink, storing intermediate values in a local state. > Le 9 juin 2016 à 15:29, Yukun Guo <gyk....@gmail.com> a écrit : > > Thank you very much for the detailed answer. Now I understand a DataStream > can be repartitioned or “joined” (don’t know the exact terminology) with > keyBy. > > But another question: > Despite the non-existence of incremental top-k algorithm, I’d like to > incrementally compute the local word count during one hour, probably using a > TreeMap for counting. As soon as the hour finishes, the TreeMap is converted > to a stream of Tuple2 and forwarded to the remaining computation thereafter. > I’m concerned about the memory usage: the TreeMap and the Tuple2 collection > hold a huge amount of items, do I have to do some custom memory management? > > I’m also not sure whether a TreeMap is suitable here. This StackOverflow > question presents a similar approach: > http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data > > <http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data>, > but the suggested solution seems rather complicated. > > > On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com > <mailto:ja...@data-artisans.com>> wrote: > Suggestions in-line below... > > On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com > <mailto:gyk....@gmail.com>> wrote: > Hi, > > I'm working on a project which uses Flink to compute hourly log statistics > like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and packed > into a DataStream. > > The problem is, I find the computation quite challenging to express with > Flink's DataStream API: > > 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that the > data volume is really high, e.g., billions of logs might be generated in one > hour, will the window grow too large and can't be handled efficiently? > > In the general case you can use: > > stream > .timeWindow(...) > .apply(reduceFunction, windowFunction) > > which can take a ReduceFunction and a WindowFunction. The ReduceFunction is > used to reduce the state on the fly and thereby keep the total state size > low. This can commonly be used in analytics applications to reduce the state > size that you're accumulating for each window. In the specific case of TopK, > however, you cannot do this if you want an exact result. To get an exact > result I believe you have to actually keep around all of the data and then > calculate TopK at the end in your WindowFunction. If you are able to use > approximate algorithms for your use case than you can calculate a > probabilistic incremental TopK based on some sort of sketch-based algorithm. > > 2. We have to create a `KeyedStream` before applying `timeWindow`. However, > the distribution of some keys are skewed hence using them may compromise > the performance due to unbalanced partition loads. (What I want is just > rebalance the stream across all partitions.) > > A good and simple way to approach this may be to come up with a composite key > for your data that *is* uniformly distributed. You can imagine something > simple like 'natural_key:random_number'. Then keyBy(natural_key) and > reduce() again. For example: > > stream > .keyBy(key, rand()) // partition by composite key that is > uniformly distributed > .timeWindow(1 hour) > .reduce() // pre-aggregation > .keyBy(key) // repartition > .timeWindow(1 hour) > .reduce() // final aggregation > > > 3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s > `mapPartition` and `reduceGroup` API as in > [FLINK-2549](https://github.com/apache/flink/pull/1161/ > <https://github.com/apache/flink/pull/1161/>), but not so easy if > taking the DataStream approach, even with the stateful operators. I still > cannot figure out how to reunion streams once they are partitioned. > > I'm not sure I know what you're trying to do here. What do you mean by > re-union? > > 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I > make Flink analyze the data incrementally rather than aggregating the logs for > one hour before starting to process? > > There is no direct way to turn a DataStream into a DataSet. I addressed the > point about doing the computation incrementally above, though. You do this > with a ReduceFunction. But again, there doesn't exist an exact incremental > TopK algorithm that I'm aware of. This can be done with sketching, though. > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com <mailto:ja...@data-artisans.com> > >