Whenever state is used, the runner will arrange such that the same keys will all go to the same worker, which often involves injecting a shuffle-like operation if the keys are spread out among many workers in the input. (An alternative implementation could involve storing the state in a distributed transactional store with the appropriate locks.) There is no need for you to do anything before calling the Deduplicate transform.
On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <[email protected]> wrote: > > Thanks Reuven, > > I got the idea of the state is per key and keys are distributed across > workers but I am trying to understand where/how the distribution part is > implemented so that elements with the same keys will go to the same worker. > Do I need to do this before calling `Deduplicate` transform? If not then > where is it being implemented? > > Thanks > -Binh > > > On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <[email protected]> > wrote: >> >> State is per-key, and keys are distributed across workers. Two workers >> should not be working on the same state. >> >> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <[email protected]> wrote: >>> >>> Thank you Ankur, >>> >>> This is the current source code of Deduplicate transform. >>> >>> Boolean seen = seenState.read(); >>> // Seen state is either set or not set so if it has been set then it >>> must be true. >>> if (seen == null) { >>> // We don't want the expiry timer to hold up watermarks. >>> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); >>> seenState.write(true); >>> receiver.output(element); >>> } >>> >>> Could you please explain the synchronization for the following scenario? >>> >>> There are two workers. >>> Both workers read the same state at the same time and the state was not set >>> yet. In this case, both will get null in the response (I believe) >>> Both of them will try to set the state and send the output out. >>> >>> What will happen in this scenario? >>> >>> Thank you >>> -Binh >>> >>> >>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <[email protected]> wrote: >>>> >>>> Hi Binh, The Deduplicate transform uses state api to do the de-duplication >>>> which should do the needful operations to work across multiple concurrent >>>> workers. >>>> >>>> Thanks, >>>> Ankur >>>> >>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <[email protected]> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I am writing a pipeline and want to apply deduplication. I look at >>>>> Deduplicate transform that Beam provides and wonder about its usage. Do I >>>>> need to shuffle input collection by key before calling this >>>>> transformation? I look at its source code and it doesn’t do any shuffle >>>>> so wonder how it works when let’s say there are duplicates and the >>>>> duplicated elements are processed concurrently on multiple workers. >>>>> >>>>> Thank you >>>>> -Binh
