If you use a KeyedStream you can group records by key (city) and then use a RichFlatMap to aggregate state in a MapState or ListState per key. You can then have that operator publish the updated results as a new aggregated record, or send it to a database or such as you see fit.
Michael > On May 8, 2018, at 4:22 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Hi all, > I'd like to introduce in our pipeline an efficient way to aggregate incoming > data around an entity. > > We have basically new incoming facts that are added (but also removed > potentially) to an entity (by id). For example, when we receive a new name of > a city we add this name to the known names of that city id (if the first > field of the tuple is ADD, if it is DEL we remove it). > At the moment we use batch job to generate an initial version of the > entities, another job that add facts to this initial version of the entities, > and another one that merges the base and the computed data. This is somehow > very inefficient in terms of speed and disk space (because every step > requires to materialize the data on the disk). > > I was wondering whether Flink could help here or not...there are a couple of > requirements that make things very complicated: > states could be potentially large (a lot of data related to an entity). Is > there any limitation about the size of the states? > data must be readable by a batch job. If I'm not wrong this could be easily > solved flushing data periodically to an external sink (like HBase or similar) > how to keep the long-running stream job up and run a batch job at the same > time? Will this be possible after Flip-6? > how to add ingest new data? Do I really need Kafka or can I just add new > datasets to a staging HDFS dir (and move them to another dir once ingested)? > Best, > Flavio