Hi Peter!
One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I
understand, you are using this lookup table for stream event enriching, so
whatever processing you need to perform downstream on this enriched stream, you
would already have the corresponding information for each session attached.
Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can
be fairly straightforward: use a MapFunction that keeps the lookup table as
managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by
memory and therefore your state size is only bounded by disk size. Each state
access would be bound to the current processed key (i.e., in your case session
id, meaning that you’d only be accessing the emails set of that session).
Using RocksDB as your state backend, each state access and update would require
de-/serialization (of the state of a single key), but that would always be
local access and in general would outperform remotely looking up an external
store.
So, to wrap this up, the answers to your doubts, when using Flink, would be:
(1) load the state as a whole from the data store into memory is a huge burn of
memory (also making changes cluster-wide visible is an issue)
Apart from the “cluster-wide visibility” aspect which needs to be clarified,
you can use RocksDB as the state backend to back the state and not keep the
state in memory.
(2) not loading into memory but using something like cassandra / redis as a
lookup store would certainly work but introduces a lot of network requests
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)
Remote lookup is not required, if you keep the lookup store as managed keyed
state in Flink. All session lookup would be local state access. You can think
of it as you’re basically setting up a K-V store within Flink that is always
co-partitioned by session id with your incoming events.
(3) how should I integrate the changes to the table with flink's checkpointing?
Simply by registering managed keyed state. Flink will handle checkpointing that
for fault tolerance for you, and ensuring exactly-once. The “Working with
State" docs hopefully should cover that quite well!
Hope this helps :)
Cheers,
Gordon
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/state_backends.html#the-rocksdbstatebackend
On 8 August 2017 at 3:00:57 AM, Peter Ertl (peter.e...@gmx.net) wrote:
Hi folks,
I am coding a streaming task that processes http requests from our web site and
enriches these with additional information.
It contains session ids from historic requests and the related emails that were
used within these session in the past.
lookup - hashtable: session_id: String => emails: Set[String]
During processing of these NEW http request
- the lookup table should be used to get previous emails and enrich the current
stream item
- new candidates for the lookup table will be discovered during processing of
these items and should be added to the lookup table (also these changes should
be visible through the cluster)
I see at least the following issues:
(1) load the state as a whole from the data store into memory is a huge burn of
memory (also making changes cluster-wide visible is an issue)
(2) not loading into memory but using something like cassandra / redis as a
lookup store would certainly work but introduces a lot of network requests
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)
(3) how should I integrate the changes to the table with flink's checkpointing?
I really don't get how to solve this best and my current solution is far from
elegant
So is there any best practice for supporting "large lookup tables that change
during stream processing" ?
Cheers
Peter