Ok I see, thanks Lukasz. I will try this out tomorrow. Sorry for the confusing question!
Josh On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <[email protected]> wrote: > Based upon your descriptions, it seemed like you wanted limited > parallelism because of an external dependency. > > Your best bet would be to use the global window combined with a > StatefulDoFn. See this blog post (https://beam.apache.org/blog/ > 2017/02/13/stateful-processing.html) about the StatefulDoFn. > > You will not be able to use a different window function till after the > StatefulDoFn otherwise a GroupByKey may schedule your work on a different > machine since the windows for a key may differ. > > Source -> StatefulDoFn -> Window.into(my other window type) > > All our sources currently operate within the global window until a > Window.into happens. So there is no need to do Source -> > Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window > type) > > > On Tue, Jun 6, 2017 at 12:03 PM, <[email protected]> wrote: > >> Hmm ok, I don't quite get why what I want to do isn't supported in Beam >> ... I don't actually have a limited parallelism requirement, I just want to >> be able to partition my unbounded stream by a key determined from the >> elements, so that any two elements with the same key will be routed to the >> same worker. I want to do this because my DoFn keeps some in-memory cached >> state for each key (which I was planning to store at either DoFn or JVM >> level). Does this sound like a bad idea? >> >> >> On 6 Jun 2017, at 19:14, Lukasz Cwik <[email protected]> wrote: >> >> Your right, the window acts as a secondary key within GroupByKey >> (KeyA,Window1 != KeyA,Window2), which means that each of those two >> composite keys can be scheduled to execute at the same time. >> >> At this point I think you should challenge your limited parallelism >> requirement as you'll need to build something outside of Apache Beam to >> provide these parallelization limits across windows (e.g. lock within the >> same process when limiting yourself to a single machine, distributed lock >> service when dealing with multiple machines). >> >> The backlog of data is either going to grow infinitely at the GroupByKey >> or grow infinitely at the source if your pipeline can't keep up. It is up >> to the Runner to be smart and not produce a giant backlog at the GroupByKey >> since it knows how fast work is being completed (unfortunately I don't know >> if any Runner is this smart yet to push the backlog up to the source). >> >> On Tue, Jun 6, 2017 at 11:03 AM, Josh <[email protected]> wrote: >> >>> I see, thanks for the tips! >>> >>> Last question about this! How could this be adapted to work in a >>> unbounded/streaming job? To work in an unbounded job, I need to put a >>> Window.into with a trigger before GroupByKey. >>> I guess this would mean that the "shard gets processed by a single >>> thread in MyDofn" guarantee will only apply to messages within a single >>> window, and would not apply across windows? >>> If this is the case, is there a better solution? I would like to avoid >>> buffering data in windows, and want the shard guarantee to apply across >>> windows. >>> >>> >>> >>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <[email protected]> wrote: >>> >>>> Your code looks like what I was describing. My only comment would be to >>>> use a deterministic hashing function which is stable across JVM versions >>>> and JVM instances as it will help in making your pipeline consistent across >>>> different runs/environments. >>>> >>>> Parallelizing across 8 instances instead of 4 would break the contract >>>> around GroupByKey (since it didn't group all the elements for a key >>>> correctly). Also, each element is the smallest unit of work and >>>> specifically in your pipeline you have chosen to reduce all your elements >>>> into 4 logical elements (each containing some proportion of your original >>>> data). >>>> >>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <[email protected]> wrote: >>>> >>>>> Thanks for the reply, Lukasz. >>>>> >>>>> >>>>> What I meant was that I want to shard my data by a "shard key", and be >>>>> sure that any two elements with the same "shard key" are processed by the >>>>> same thread on the same worker. (Or if that's not possible, by the same >>>>> worker JVM with no thread guarantee would be good enough). It doesn't >>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances >>>>> processing the data. >>>>> >>>>> >>>>> It sounds like what you suggested will work for this, with the >>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4). >>>>> >>>>> It seems a bit long and messy but am I right in thinking it would look >>>>> like this? ... >>>>> >>>>> >>>>> PCollection<MyElement> elements = ...; >>>>> >>>>> elements >>>>> >>>>> .apply(MapElements >>>>> >>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(), >>>>> TypeDescriptor.of(MyElement.class))) >>>>> >>>>> .via((MyElement e) -> KV.of( >>>>> >>>>> e.getKey().toString().hashCode() % 4, e))) >>>>> >>>>> .apply(GroupByKey.create()) >>>>> >>>>> .apply(Partition.of(4, >>>>> >>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) -> >>>>> kv.getKey())) >>>>> >>>>> .apply(ParDo.of(new MyDofn())); >>>>> >>>>> // Where MyDofn must be changed to handle a KV<Integer, >>>>> Iterable<MyElement>> as input instead of just a MyElement >>>>> >>>>> >>>>> I was wondering is there a guarantee that the runner won't parallelise >>>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two >>>>> input elements with the same key are they actually guaranteed to be >>>>> processed on the same instance? >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Josh >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> I think this is what your asking for but your statement about 4 >>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4 >>>>>> completely different DoFns. Also its unclear what you mean by >>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn >>>>>> each being processed by a single thread. >>>>>> >>>>>> This is a bad idea because you limit your parallelism but this is >>>>>> similar to what the default file sharding logic does. In Apache Beam the >>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. >>>>>> We >>>>>> exploit this by assigning all our values to a fixed number of keys and >>>>>> then >>>>>> performing a GroupByKey. This is the same trick that powers the file >>>>>> sharding logic in AvroIO/TextIO/... >>>>>> >>>>>> Your pipeline would look like (fixed width font diagram): >>>>>> your data -> apply shard key -> GroupByKey -> >>>>>> partition by key -> your dofn #1 >>>>>> >>>>>> \> your dofn #2 >>>>>> >>>>>> \> ... >>>>>> a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? >>>>>> >>>>>> This is not exactly the same as processing a single DoFn >>>>>> instance/thread because it relies on the Runner to be able to schedule >>>>>> each >>>>>> key to be processed on a different machine. For example a Runner may >>>>>> choose >>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or >>>>>> may choose to distribute them. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <[email protected]> wrote: >>>>>> >>>>>>> Hey Lukasz, >>>>>>> >>>>>>> I have a follow up question about this - >>>>>>> >>>>>>> What if I want to do something very similar, but instead of with 4 >>>>>>> instances of AvroIO following the partition transform, I want 4 >>>>>>> instances >>>>>>> of a DoFn that I've written. I want to ensure that each partition is >>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam? >>>>>>> >>>>>>> Thanks, >>>>>>> Josh >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <[email protected]> wrote: >>>>>>> >>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >>>>>>>> >>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic >>>>>>>>> sharding occurs if you don't explicitly set a numShard value. >>>>>>>>> >>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Lukasz, >>>>>>>>>> >>>>>>>>>> Thanks for the example. That sounds like a nice solution - >>>>>>>>>> I am running on Dataflow though, which dynamically sets numShards >>>>>>>>>> - so if I set numShards to 1 on each of those AvroIO writers, I >>>>>>>>>> can't be >>>>>>>>>> sure that Dataflow isn't going to override my setting right? I guess >>>>>>>>>> this >>>>>>>>>> should work fine as long as I partition my stream into a large enough >>>>>>>>>> number of partitions so that Dataflow won't override numShards. >>>>>>>>>> >>>>>>>>>> Josh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Since your using a small number of shards, add a Partition >>>>>>>>>>> transform which uses a deterministic hash of the key to choose one >>>>>>>>>>> of 4 >>>>>>>>>>> partitions. Write each partition with a single shard. >>>>>>>>>>> >>>>>>>>>>> (Fixed width diagram below) >>>>>>>>>>> Pipeline -> AvroIO(numShards = 4) >>>>>>>>>>> Becomes: >>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>>> \-> AvroIO(numShards = 1) >>>>>>>>>>> >>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <[email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded >>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4). >>>>>>>>>>>> >>>>>>>>>>>> I would like to partition the stream by some key in the >>>>>>>>>>>> element, so that all elements with the same key will get processed >>>>>>>>>>>> by the >>>>>>>>>>>> same shard writer, and therefore written to the same file. Is >>>>>>>>>>>> there a way >>>>>>>>>>>> to do this? Note that in my stream the number of keys is very >>>>>>>>>>>> large (most >>>>>>>>>>>> elements have a unique key, while a few elements share a key). >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Josh >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
