You should have a look at this project : https://github.com/addthis/stream-lib

You can use it within Flink, storing intermediate values in a local state.





> Le 9 juin 2016 à 15:29, Yukun Guo <gyk....@gmail.com> a écrit :
> 
> Thank you very much for the detailed answer. Now I understand a DataStream 
> can be repartitioned or “joined” (don’t know the exact terminology) with 
> keyBy.
> 
> But another question: 
> Despite the non-existence of incremental top-k algorithm, I’d like to 
> incrementally compute the local word count during one hour, probably using a 
> TreeMap for counting. As soon as the hour finishes, the TreeMap is converted 
> to a stream of Tuple2 and forwarded to the remaining computation thereafter. 
> I’m concerned about the memory usage: the TreeMap and the Tuple2 collection 
> hold a huge amount of items, do I have to do some custom memory management?
> 
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow 
> question presents a similar approach: 
> http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data
>  
> <http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data>,
>  but the suggested solution seems rather complicated.
> 
> 
> On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com 
> <mailto:ja...@data-artisans.com>> wrote:
> Suggestions in-line below...
> 
> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com 
> <mailto:gyk....@gmail.com>> wrote:
> Hi,
> 
> I'm working on a project which uses Flink to compute hourly log statistics
> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and packed
> into a DataStream.
> 
> The problem is, I find the computation quite challenging to express with
> Flink's DataStream API:
> 
> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that the
> data volume is really high, e.g., billions of logs might be generated in one
> hour, will the window grow too large and can't be handled efficiently?
> 
> In the general case you can use:
> 
>     stream
>         .timeWindow(...)
>         .apply(reduceFunction, windowFunction)
> 
> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction is 
> used to reduce the state on the fly and thereby keep the total state size 
> low.  This can commonly be used in analytics applications to reduce the state 
> size that you're accumulating for each window.  In the specific case of TopK, 
> however, you cannot do this if you want an exact result.  To get an exact 
> result I believe you have to actually keep around all of the data and then 
> calculate TopK at the end in your WindowFunction.  If you are able to use 
> approximate algorithms for your use case than you can calculate a 
> probabilistic incremental TopK based on some sort of sketch-based algorithm.
> 
> 2. We have to create a `KeyedStream` before applying `timeWindow`. However,
> the distribution of some keys are skewed hence using them may compromise
> the performance due to unbalanced partition loads. (What I want is just
> rebalance the stream across all partitions.)
> 
> A good and simple way to approach this may be to come up with a composite key 
> for your data that *is* uniformly distributed.  You can imagine something 
> simple like 'natural_key:random_number'.  Then keyBy(natural_key) and 
> reduce() again.  For example:
> 
>     stream
>         .keyBy(key, rand())      // partition by composite key that is 
> uniformly distributed
>         .timeWindow(1 hour)
>         .reduce()                     // pre-aggregation
>         .keyBy(key)                // repartition
>         .timeWindow(1 hour)
>         .reduce()                     // final aggregation
>  
> 
> 3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s
> `mapPartition` and `reduceGroup` API as in
> [FLINK-2549](https://github.com/apache/flink/pull/1161/ 
> <https://github.com/apache/flink/pull/1161/>), but not so easy if
> taking the DataStream approach, even with the stateful operators. I still
> cannot figure out how to reunion streams once they are partitioned.
> 
>     I'm not sure I know what you're trying to do here.  What do you mean by 
> re-union?
>  
> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
> make Flink analyze the data incrementally rather than aggregating the logs for
> one hour before starting to process?
> 
> There is no direct way to turn a DataStream into a DataSet.  I addressed the 
> point about doing the computation incrementally above, though.  You do this 
> with a ReduceFunction.  But again, there doesn't exist an exact incremental 
> TopK algorithm that I'm aware of.  This can be done with sketching, though.
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com <mailto:ja...@data-artisans.com>
> 
> 

Reply via email to