Hi Lukasz,

Have just given this a go with the state API and stateful DoFn on the
global window, as you suggested - it seems to work very well.

I was just wondering how efficient it is when running on the Dataflow
runner, if for example, several elements with the same key arrive within a
few milliseconds of one another  e.g. (k1, v1), (k1, v2), (k1, v3) ... and
in my stateful DoFn's processElement method I am reading and updating the
state via state.read() and state.write(...). Is it reading and writing to
an external store every time? Or is it doing all this in-memory? - I'm just
wondering how it will scale for a larger volume stream.

Thanks,
Josh



On Tue, Jun 6, 2017 at 11:18 PM, Josh <[email protected]> wrote:

> Ok I see, thanks Lukasz. I will try this out tomorrow.
>
> Sorry for the confusing question!
>
> Josh
>
> On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <[email protected]> wrote:
>
>> Based upon your descriptions, it seemed like you wanted limited
>> parallelism because of an external dependency.
>>
>> Your best bet would be to use the global window combined with a
>> StatefulDoFn. See this blog post (https://beam.apache.org/blog/
>> 2017/02/13/stateful-processing.html) about the StatefulDoFn.
>>
>> You will not be able to use a different window function till after the
>> StatefulDoFn otherwise a GroupByKey may schedule your work on a different
>> machine since the windows for a key may differ.
>>
>> Source -> StatefulDoFn -> Window.into(my other window type)
>>
>> All our sources currently operate within the global window until a
>> Window.into happens. So there is no need to do Source ->
>> Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
>> type)
>>
>>
>> On Tue, Jun 6, 2017 at 12:03 PM, <[email protected]> wrote:
>>
>>> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
>>> ... I don't actually have a limited parallelism requirement, I just want to
>>> be able to partition my unbounded stream by a key determined from the
>>> elements, so that any two elements with the same key will be routed to the
>>> same worker. I want to do this because my DoFn keeps some in-memory cached
>>> state for each key (which I was planning to store at either DoFn or JVM
>>> level). Does this sound like a bad idea?
>>>
>>>
>>> On 6 Jun 2017, at 19:14, Lukasz Cwik <[email protected]> wrote:
>>>
>>> Your right, the window acts as a secondary key within GroupByKey
>>> (KeyA,Window1 != KeyA,Window2), which means that each of those two
>>> composite keys can be scheduled to execute at the same time.
>>>
>>> At this point I think you should challenge your limited parallelism
>>> requirement as you'll need to build something outside of Apache Beam to
>>> provide these parallelization limits across windows (e.g. lock within the
>>> same process when limiting yourself to a single machine, distributed lock
>>> service when dealing with multiple machines).
>>>
>>> The backlog of data is either going to grow infinitely at the GroupByKey
>>> or grow infinitely at the source if your pipeline can't keep up. It is up
>>> to the Runner to be smart and not produce a giant backlog at the GroupByKey
>>> since it knows how fast work is being completed (unfortunately I don't know
>>> if any Runner is this smart yet to push the backlog up to the source).
>>>
>>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <[email protected]> wrote:
>>>
>>>> I see, thanks for the tips!
>>>>
>>>> Last question about this! How could this be adapted to work in a
>>>> unbounded/streaming job? To work in an unbounded job, I need to put a
>>>> Window.into with a trigger before GroupByKey.
>>>> I guess this would mean that the "shard gets processed by a single
>>>> thread in MyDofn" guarantee will only apply to messages within a single
>>>> window, and would not apply across windows?
>>>> If this is the case, is there a better solution? I would like to avoid
>>>> buffering data in windows, and want the shard guarantee to apply across
>>>> windows.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Your code looks like what I was describing. My only comment would be
>>>>> to use a deterministic hashing function which is stable across JVM 
>>>>> versions
>>>>> and JVM instances as it will help in making your pipeline consistent 
>>>>> across
>>>>> different runs/environments.
>>>>>
>>>>> Parallelizing across 8 instances instead of 4 would break the contract
>>>>> around GroupByKey (since it didn't group all the elements for a key
>>>>> correctly). Also, each element is the smallest unit of work and
>>>>> specifically in your pipeline you have chosen to reduce all your elements
>>>>> into 4 logical elements (each containing some proportion of your original
>>>>> data).
>>>>>
>>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <[email protected]> wrote:
>>>>>
>>>>>> Thanks for the reply, Lukasz.
>>>>>>
>>>>>>
>>>>>> What I meant was that I want to shard my data by a "shard key", and
>>>>>> be sure that any two elements with the same "shard key" are processed by
>>>>>> the same thread on the same worker. (Or if that's not possible, by the 
>>>>>> same
>>>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>>>> processing the data.
>>>>>>
>>>>>>
>>>>>> It sounds like what you suggested will work for this, with the
>>>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>>>
>>>>>> It seems a bit long and messy but am I right in thinking it would
>>>>>> look like this? ...
>>>>>>
>>>>>>
>>>>>> PCollection<MyElement> elements = ...;
>>>>>>
>>>>>> elements
>>>>>>
>>>>>> .apply(MapElements
>>>>>>
>>>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>>>> TypeDescriptor.of(MyElement.class)))
>>>>>>
>>>>>> .via((MyElement e) -> KV.of(
>>>>>>
>>>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>>>
>>>>>> .apply(GroupByKey.create())
>>>>>>
>>>>>> .apply(Partition.of(4,
>>>>>>
>>>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>>>>>> kv.getKey()))
>>>>>>
>>>>>> .apply(ParDo.of(new MyDofn()));
>>>>>>
>>>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>>>
>>>>>>
>>>>>> I was wondering is there a guarantee that the runner won't
>>>>>> parallelise the final MyDofn across e.g. 8 instances instead of 4? If 
>>>>>> there
>>>>>> are two input elements with the same key are they actually guaranteed to 
>>>>>> be
>>>>>> processed on the same instance?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> I think this is what your asking for but your statement about 4
>>>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 
>>>>>>> 4
>>>>>>> completely different DoFns. Also its unclear what you mean by
>>>>>>> instance/thread, I'm assuming that you want at most 4 instances of a 
>>>>>>> DoFn
>>>>>>> each being processed by a single thread.
>>>>>>>
>>>>>>> This is a bad idea because you limit your parallelism but this is
>>>>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. 
>>>>>>> We
>>>>>>> exploit this by assigning all our values to a fixed number of keys and 
>>>>>>> then
>>>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>>>> sharding logic in AvroIO/TextIO/...
>>>>>>>
>>>>>>> Your pipeline would look like (fixed width font diagram):
>>>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>>>> partition by key -> your dofn #1
>>>>>>>
>>>>>>>              \> your dofn #2
>>>>>>>
>>>>>>>              \> ...
>>>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>>>>
>>>>>>> This is not exactly the same as processing a single DoFn
>>>>>>> instance/thread because it relies on the Runner to be able to schedule 
>>>>>>> each
>>>>>>> key to be processed on a different machine. For example a Runner may 
>>>>>>> choose
>>>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>>>>> may choose to distribute them.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hey Lukasz,
>>>>>>>>
>>>>>>>> I have a follow up question about this -
>>>>>>>>
>>>>>>>> What if I want to do something very similar, but instead of with 4
>>>>>>>> instances of AvroIO following the partition transform, I want 4 
>>>>>>>> instances
>>>>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Lukasz,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>>>>> I am running on Dataflow though, which dynamically sets
>>>>>>>>>>> numShards - so if I set numShards to 1 on each of those AvroIO 
>>>>>>>>>>> writers, I
>>>>>>>>>>> can't be sure that Dataflow isn't going to override my setting 
>>>>>>>>>>> right? I
>>>>>>>>>>> guess this should work fine as long as I partition my stream into a 
>>>>>>>>>>> large
>>>>>>>>>>> enough number of partitions so that Dataflow won't override 
>>>>>>>>>>> numShards.
>>>>>>>>>>>
>>>>>>>>>>> Josh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>>>>> transform which uses a deterministic hash of the key to choose one 
>>>>>>>>>>>> of 4
>>>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>>>
>>>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>>>> Becomes:
>>>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <[email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded
>>>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would like to partition the stream by some key in the
>>>>>>>>>>>>> element, so that all elements with the same key will get 
>>>>>>>>>>>>> processed by the
>>>>>>>>>>>>> same shard writer, and therefore written to the same file. Is 
>>>>>>>>>>>>> there a way
>>>>>>>>>>>>> to do this? Note that in my stream the number of keys is very 
>>>>>>>>>>>>> large (most
>>>>>>>>>>>>> elements have a unique key, while a few elements share a key).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Josh
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to