We have a similar use case, except with BigtableIO instead of HBase.

We ended up building a custom transform that was basically
PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows
from Bigtable based on the input, however it's tricky to get right because
of batching, etc.

I'm curious how a StatefulDoFn would help here, it seems like it's more of
just a cache than an actual join (and in my use-case we're never reading a
key more than once so a cache wouldn't help here anyways).  Also I'd be
interested to see how the state storage performs with "large" amounts of
state.  We're reading ~1 TB of data from Bigtable in a run, and it doesn't
seem reasonable to store that all in a DoFn's state.



On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <lc...@google.com> wrote:

> What about a StatefulDoFn where you append the value(s) in a bag state as
> you see them?
>
> If you need to seed the state information, you could do a one time lookup
> in processElement for each key to HBase if the key hasn't yet been seen
> (storing the fact that you loaded the data in a boolean) but afterwards you
> would rely on reading the value(s) from the bag state.
>
> processElement(...) {
>   Value newValue = ...
>   Iterable<Value> values;
>   if (!hasSeenKeyBooleanValueState.read()) {
>     values = ... load from HBase ...
>     valuesBagState.append(values);
>     hasSeenKeyBooleanValueState.set(true);
>   } else {
>     values = valuesBagState.read();
>   }
>   ... perform processing using values ...
>
>    valuesBagState.append(newValue);
> }
>
> This blog post[1] has a good example.
>
> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <pela.chan...@gmail.com>
> wrote:
>
>> Hello All,
>> I have a use case where I have PCollection<KV<Key,Value>> data coming
>> from Kafka source. When processing each record (KV<Key,Value>) I need all
>> old values for that Key stored in a hbase table. The naive approach is to
>> do HBase lookup in the DoFn.processElement. I considered sideinput but it'
>> not going to work because of large dataset. Any suggestion?
>>
>> Thanks,
>> Chandan
>>
>

Reply via email to