Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed transactional store with the appropriate
locks.) There is no need for you to do anything before calling the
Deduplicate transform.

On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <[email protected]> wrote:
>
> Thanks Reuven,
>
> I got the idea of the state is per key and keys are distributed across 
> workers but I am trying to understand where/how the distribution part is 
> implemented so that elements with the same keys will go to the same worker. 
> Do I need to do this before calling `Deduplicate` transform? If not then 
> where is it being implemented?
>
> Thanks
> -Binh
>
>
> On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <[email protected]> 
> wrote:
>>
>> State is per-key, and keys are distributed across workers. Two workers 
>> should not be working on the same state.
>>
>> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <[email protected]> wrote:
>>>
>>> Thank you Ankur,
>>>
>>> This is the current source code of Deduplicate transform.
>>>
>>>       Boolean seen = seenState.read();
>>>       // Seen state is either set or not set so if it has been set then it 
>>> must be true.
>>>       if (seen == null) {
>>>         // We don't want the expiry timer to hold up watermarks.
>>>         expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>>         seenState.write(true);
>>>         receiver.output(element);
>>>       }
>>>
>>> Could you please explain the synchronization for the following scenario?
>>>
>>> There are two workers.
>>> Both workers read the same state at the same time and the state was not set 
>>> yet. In this case, both will get null in the response (I believe)
>>> Both of them will try to set the state and send the output out.
>>>
>>> What will happen in this scenario?
>>>
>>> Thank you
>>> -Binh
>>>
>>>
>>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <[email protected]> wrote:
>>>>
>>>> Hi Binh, The Deduplicate transform uses state api to do the de-duplication 
>>>> which should do the needful operations to work across multiple concurrent 
>>>> workers.
>>>>
>>>> Thanks,
>>>> Ankur
>>>>
>>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <[email protected]> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a pipeline and want to apply deduplication. I look at 
>>>>> Deduplicate transform that Beam provides and wonder about its usage. Do I 
>>>>> need to shuffle input collection by key before calling this 
>>>>> transformation? I look at its source code and it doesn’t do any shuffle 
>>>>> so wonder how it works when let’s say there are duplicates and the 
>>>>> duplicated elements are processed concurrently on multiple workers.
>>>>>
>>>>> Thank you
>>>>> -Binh

Reply via email to