Re: load + update global state

2017-08-07 Thread Tzu-Li (Gordon) Tai
Hi Peter!

One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I 
understand, you are using this lookup table for stream event enriching, so 
whatever processing you need to perform downstream on this enriched stream, you 
would already have the corresponding information for each session attached.

Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can 
be fairly straightforward: use a MapFunction that keeps the lookup table as 
managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by 
memory and therefore your state size is only bounded by disk size. Each state 
access would be bound to the current processed key (i.e., in your case session 
id, meaning that you’d only be accessing the emails set of that session).
Using RocksDB as your state backend, each state access and update would require 
de-/serialization (of the state of a single key), but that would always be 
local access and in general would outperform remotely looking up an external 
store.

So, to wrap this up, the answers to your doubts, when using Flink, would be:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue) 
Apart from the “cluster-wide visibility” aspect which needs to be clarified, 
you can use RocksDB as the state backend to back the state and not keep the 
state in memory.

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?) 
Remote lookup is not required, if you keep the lookup store as managed keyed 
state in Flink. All session lookup would be local state access. You can think 
of it as you’re basically setting up a K-V store within Flink that is always 
co-partitioned by session id with your incoming events.

(3) how should I integrate the changes to the table with flink's checkpointing? 
Simply by registering managed keyed state. Flink will handle checkpointing that 
for fault tolerance for you, and ensuring exactly-once. The “Working with 
State" docs hopefully should cover that quite well!


Hope this helps :)

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-keyed-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/state_backends.html#the-rocksdbstatebackend


On 8 August 2017 at 3:00:57 AM, Peter Ertl (peter.e...@gmx.net) wrote:

Hi folks,  

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.  

It contains session ids from historic requests and the related emails that were 
used within these session in the past.  


lookup - hashtable: session_id: String => emails: Set[String]  


During processing of these NEW http request  

- the lookup table should be used to get previous emails and enrich the current 
stream item  
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)  

I see at least the following issues:  

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)  

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)  

(3) how should I integrate the changes to the table with flink's checkpointing? 
 

I really don't get how to solve this best and my current solution is far from 
elegant  

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?  

Cheers  
Peter  






load + update global state

2017-08-07 Thread Peter Ertl
Hi folks,

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.

It contains session ids from historic requests and the related emails that were 
used within these session in the past.


lookup - hashtable: session_id: String => emails: Set[String]


During processing of these NEW http request

- the lookup table should be used to get previous emails and enrich the current 
stream item
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)

I see at least the following issues:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)

(3) how should I integrate the changes to the table with flink's checkpointing?

I really don't get how to solve this best and my current solution is far from 
elegant 

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?

Cheers
Peter