Hi Lukasz, Have just given this a go with the state API and stateful DoFn on the global window, as you suggested - it seems to work very well.
I was just wondering how efficient it is when running on the Dataflow runner, if for example, several elements with the same key arrive within a few milliseconds of one another e.g. (k1, v1), (k1, v2), (k1, v3) ... and in my stateful DoFn's processElement method I am reading and updating the state via state.read() and state.write(...). Is it reading and writing to an external store every time? Or is it doing all this in-memory? - I'm just wondering how it will scale for a larger volume stream. Thanks, Josh On Tue, Jun 6, 2017 at 11:18 PM, Josh <[email protected]> wrote: > Ok I see, thanks Lukasz. I will try this out tomorrow. > > Sorry for the confusing question! > > Josh > > On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <[email protected]> wrote: > >> Based upon your descriptions, it seemed like you wanted limited >> parallelism because of an external dependency. >> >> Your best bet would be to use the global window combined with a >> StatefulDoFn. See this blog post (https://beam.apache.org/blog/ >> 2017/02/13/stateful-processing.html) about the StatefulDoFn. >> >> You will not be able to use a different window function till after the >> StatefulDoFn otherwise a GroupByKey may schedule your work on a different >> machine since the windows for a key may differ. >> >> Source -> StatefulDoFn -> Window.into(my other window type) >> >> All our sources currently operate within the global window until a >> Window.into happens. So there is no need to do Source -> >> Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window >> type) >> >> >> On Tue, Jun 6, 2017 at 12:03 PM, <[email protected]> wrote: >> >>> Hmm ok, I don't quite get why what I want to do isn't supported in Beam >>> ... I don't actually have a limited parallelism requirement, I just want to >>> be able to partition my unbounded stream by a key determined from the >>> elements, so that any two elements with the same key will be routed to the >>> same worker. I want to do this because my DoFn keeps some in-memory cached >>> state for each key (which I was planning to store at either DoFn or JVM >>> level). Does this sound like a bad idea? >>> >>> >>> On 6 Jun 2017, at 19:14, Lukasz Cwik <[email protected]> wrote: >>> >>> Your right, the window acts as a secondary key within GroupByKey >>> (KeyA,Window1 != KeyA,Window2), which means that each of those two >>> composite keys can be scheduled to execute at the same time. >>> >>> At this point I think you should challenge your limited parallelism >>> requirement as you'll need to build something outside of Apache Beam to >>> provide these parallelization limits across windows (e.g. lock within the >>> same process when limiting yourself to a single machine, distributed lock >>> service when dealing with multiple machines). >>> >>> The backlog of data is either going to grow infinitely at the GroupByKey >>> or grow infinitely at the source if your pipeline can't keep up. It is up >>> to the Runner to be smart and not produce a giant backlog at the GroupByKey >>> since it knows how fast work is being completed (unfortunately I don't know >>> if any Runner is this smart yet to push the backlog up to the source). >>> >>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <[email protected]> wrote: >>> >>>> I see, thanks for the tips! >>>> >>>> Last question about this! How could this be adapted to work in a >>>> unbounded/streaming job? To work in an unbounded job, I need to put a >>>> Window.into with a trigger before GroupByKey. >>>> I guess this would mean that the "shard gets processed by a single >>>> thread in MyDofn" guarantee will only apply to messages within a single >>>> window, and would not apply across windows? >>>> If this is the case, is there a better solution? I would like to avoid >>>> buffering data in windows, and want the shard guarantee to apply across >>>> windows. >>>> >>>> >>>> >>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Your code looks like what I was describing. My only comment would be >>>>> to use a deterministic hashing function which is stable across JVM >>>>> versions >>>>> and JVM instances as it will help in making your pipeline consistent >>>>> across >>>>> different runs/environments. >>>>> >>>>> Parallelizing across 8 instances instead of 4 would break the contract >>>>> around GroupByKey (since it didn't group all the elements for a key >>>>> correctly). Also, each element is the smallest unit of work and >>>>> specifically in your pipeline you have chosen to reduce all your elements >>>>> into 4 logical elements (each containing some proportion of your original >>>>> data). >>>>> >>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <[email protected]> wrote: >>>>> >>>>>> Thanks for the reply, Lukasz. >>>>>> >>>>>> >>>>>> What I meant was that I want to shard my data by a "shard key", and >>>>>> be sure that any two elements with the same "shard key" are processed by >>>>>> the same thread on the same worker. (Or if that's not possible, by the >>>>>> same >>>>>> worker JVM with no thread guarantee would be good enough). It doesn't >>>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances >>>>>> processing the data. >>>>>> >>>>>> >>>>>> It sounds like what you suggested will work for this, with the >>>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4). >>>>>> >>>>>> It seems a bit long and messy but am I right in thinking it would >>>>>> look like this? ... >>>>>> >>>>>> >>>>>> PCollection<MyElement> elements = ...; >>>>>> >>>>>> elements >>>>>> >>>>>> .apply(MapElements >>>>>> >>>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(), >>>>>> TypeDescriptor.of(MyElement.class))) >>>>>> >>>>>> .via((MyElement e) -> KV.of( >>>>>> >>>>>> e.getKey().toString().hashCode() % 4, e))) >>>>>> >>>>>> .apply(GroupByKey.create()) >>>>>> >>>>>> .apply(Partition.of(4, >>>>>> >>>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) -> >>>>>> kv.getKey())) >>>>>> >>>>>> .apply(ParDo.of(new MyDofn())); >>>>>> >>>>>> // Where MyDofn must be changed to handle a KV<Integer, >>>>>> Iterable<MyElement>> as input instead of just a MyElement >>>>>> >>>>>> >>>>>> I was wondering is there a guarantee that the runner won't >>>>>> parallelise the final MyDofn across e.g. 8 instances instead of 4? If >>>>>> there >>>>>> are two input elements with the same key are they actually guaranteed to >>>>>> be >>>>>> processed on the same instance? >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Josh >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> I think this is what your asking for but your statement about 4 >>>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or >>>>>>> 4 >>>>>>> completely different DoFns. Also its unclear what you mean by >>>>>>> instance/thread, I'm assuming that you want at most 4 instances of a >>>>>>> DoFn >>>>>>> each being processed by a single thread. >>>>>>> >>>>>>> This is a bad idea because you limit your parallelism but this is >>>>>>> similar to what the default file sharding logic does. In Apache Beam the >>>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. >>>>>>> We >>>>>>> exploit this by assigning all our values to a fixed number of keys and >>>>>>> then >>>>>>> performing a GroupByKey. This is the same trick that powers the file >>>>>>> sharding logic in AvroIO/TextIO/... >>>>>>> >>>>>>> Your pipeline would look like (fixed width font diagram): >>>>>>> your data -> apply shard key -> GroupByKey -> >>>>>>> partition by key -> your dofn #1 >>>>>>> >>>>>>> \> your dofn #2 >>>>>>> >>>>>>> \> ... >>>>>>> a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? >>>>>>> >>>>>>> This is not exactly the same as processing a single DoFn >>>>>>> instance/thread because it relies on the Runner to be able to schedule >>>>>>> each >>>>>>> key to be processed on a different machine. For example a Runner may >>>>>>> choose >>>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or >>>>>>> may choose to distribute them. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <[email protected]> wrote: >>>>>>> >>>>>>>> Hey Lukasz, >>>>>>>> >>>>>>>> I have a follow up question about this - >>>>>>>> >>>>>>>> What if I want to do something very similar, but instead of with 4 >>>>>>>> instances of AvroIO following the partition transform, I want 4 >>>>>>>> instances >>>>>>>> of a DoFn that I've written. I want to ensure that each partition is >>>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Josh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <[email protected]> wrote: >>>>>>>> >>>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >>>>>>>>> >>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic >>>>>>>>>> sharding occurs if you don't explicitly set a numShard value. >>>>>>>>>> >>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Lukasz, >>>>>>>>>>> >>>>>>>>>>> Thanks for the example. That sounds like a nice solution - >>>>>>>>>>> I am running on Dataflow though, which dynamically sets >>>>>>>>>>> numShards - so if I set numShards to 1 on each of those AvroIO >>>>>>>>>>> writers, I >>>>>>>>>>> can't be sure that Dataflow isn't going to override my setting >>>>>>>>>>> right? I >>>>>>>>>>> guess this should work fine as long as I partition my stream into a >>>>>>>>>>> large >>>>>>>>>>> enough number of partitions so that Dataflow won't override >>>>>>>>>>> numShards. >>>>>>>>>>> >>>>>>>>>>> Josh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Since your using a small number of shards, add a Partition >>>>>>>>>>>> transform which uses a deterministic hash of the key to choose one >>>>>>>>>>>> of 4 >>>>>>>>>>>> partitions. Write each partition with a single shard. >>>>>>>>>>>> >>>>>>>>>>>> (Fixed width diagram below) >>>>>>>>>>>> Pipeline -> AvroIO(numShards = 4) >>>>>>>>>>>> Becomes: >>>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>>>> \-> AvroIO(numShards = 1) >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <[email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded >>>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4). >>>>>>>>>>>>> >>>>>>>>>>>>> I would like to partition the stream by some key in the >>>>>>>>>>>>> element, so that all elements with the same key will get >>>>>>>>>>>>> processed by the >>>>>>>>>>>>> same shard writer, and therefore written to the same file. Is >>>>>>>>>>>>> there a way >>>>>>>>>>>>> to do this? Note that in my stream the number of keys is very >>>>>>>>>>>>> large (most >>>>>>>>>>>>> elements have a unique key, while a few elements share a key). >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Josh >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
