We have a similar use case, except with BigtableIO instead of HBase. We ended up building a custom transform that was basically PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows from Bigtable based on the input, however it's tricky to get right because of batching, etc.
I'm curious how a StatefulDoFn would help here, it seems like it's more of just a cache than an actual join (and in my use-case we're never reading a key more than once so a cache wouldn't help here anyways). Also I'd be interested to see how the state storage performs with "large" amounts of state. We're reading ~1 TB of data from Bigtable in a run, and it doesn't seem reasonable to store that all in a DoFn's state. On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <lc...@google.com> wrote: > What about a StatefulDoFn where you append the value(s) in a bag state as > you see them? > > If you need to seed the state information, you could do a one time lookup > in processElement for each key to HBase if the key hasn't yet been seen > (storing the fact that you loaded the data in a boolean) but afterwards you > would rely on reading the value(s) from the bag state. > > processElement(...) { > Value newValue = ... > Iterable<Value> values; > if (!hasSeenKeyBooleanValueState.read()) { > values = ... load from HBase ... > valuesBagState.append(values); > hasSeenKeyBooleanValueState.set(true); > } else { > values = valuesBagState.read(); > } > ... perform processing using values ... > > valuesBagState.append(newValue); > } > > This blog post[1] has a good example. > > 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html > > On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <pela.chan...@gmail.com> > wrote: > >> Hello All, >> I have a use case where I have PCollection<KV<Key,Value>> data coming >> from Kafka source. When processing each record (KV<Key,Value>) I need all >> old values for that Key stored in a hbase table. The naive approach is to >> do HBase lookup in the DoFn.processElement. I considered sideinput but it' >> not going to work because of large dataset. Any suggestion? >> >> Thanks, >> Chandan >> >