Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its 
value as a list. However, I think that would also suffer in 
serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you 
could act like this:

After keyby, we would first process a 🐿,  then we get map state as < 🐿, 1>. 
Then we also process another 🐿, and we would get  < 🐿, 2>. Lastly, we process a 
🦞 and try to search the map to know we already queued two 🐿. In this time we 
could produce {🐿, 🦞} and set previous map state as < 🐿, 1> . If you could 
follow this logic, the previous serialize/deserialize of Seq<VALUE> could be 
greatly reduced.

Best
Yun Tang
________________________________
From: Aaron Langford <aaron.langfor...@gmail.com>
Sent: Wednesday, December 18, 2019 6:47
To: user@flink.apache.org <user@flink.apache.org>
Subject: MapState with List Type for values

Hello Flink Community,

I have a question about using MapState with lists as values. Here is a 
description of the use case:

I have an operator over a keyed stream where for each record that comes, it 
needs to look into some state to determine if a value has arrived or not. If 
the value has not arrived yet, the record needs to be en-queued for when the 
value will eventually arrive. When that value does arrive, the queued records 
need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
---------------------------------------------------------------------
πŸΏπŸ¦žπŸΏπŸ¦„πŸΏπŸ¦„πŸ¦„πŸ‰ ...
---------------------------------------------------------------------
Where 🐿 must wait for at least 1 🦞 to be output (otherwise be queued) and πŸ¦„ 
must wait for πŸ‰to be output (otherwise be queued). If you can't tell, this is 
basically/sort of a join without a window.

The output should be something abstractly like this:

---------------------------------------------------------------------
{🐿,🦞}{🐿,🦞}{🐿,🦞}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}...
---------------------------------------------------------------------

Many records might be en-queued while waiting for a given value. Many records 
may be waiting for many different values, but any record that is en-queued will 
only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons 
to avoid keying the stream on the "join key" as it were. Namely I'm getting a 
CDC stream with a lot of different tables, and I want to avoid a topology with 
N operators for N different tables if I can.

If I lean on MapState<KEY, Seq<VALUE>> to get this done for me, then my job 
suffers considerably in terms of performance. I believe one of the biggest 
bottlenecks is that for each time I need to interact with a Seq<VALUE> (like 
for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any 
guidance for how I might serialize/deserialize a list type in the MapState in 
such a way that appends aren't so expensive? Open to other 
suggestions/approaches as well.

Aaron

Reply via email to