Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
That explains it. Thank you Robert and all!

-Binh


On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> Whenever state is used, the runner will arrange such that the same
> keys will all go to the same worker, which often involves injecting a
> shuffle-like operation if the keys are spread out among many workers
> in the input. (An alternative implementation could involve storing the
> state in a distributed transactional store with the appropriate
> locks.) There is no need for you to do anything before calling the
> Deduplicate transform.
>
> On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van  wrote:
> >
> > Thanks Reuven,
> >
> > I got the idea of the state is per key and keys are distributed across
> workers but I am trying to understand where/how the distribution part is
> implemented so that elements with the same keys will go to the same worker.
> Do I need to do this before calling `Deduplicate` transform? If not then
> where is it being implemented?
> >
> > Thanks
> > -Binh
> >
> >
> > On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <
> user@beam.apache.org> wrote:
> >>
> >> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
> >>
> >> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van 
> wrote:
> >>>
> >>> Thank you Ankur,
> >>>
> >>> This is the current source code of Deduplicate transform.
> >>>
> >>>   Boolean seen = seenState.read();
> >>>   // Seen state is either set or not set so if it has been set
> then it must be true.
> >>>   if (seen == null) {
> >>> // We don't want the expiry timer to hold up watermarks.
> >>>
>  expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> >>> seenState.write(true);
> >>> receiver.output(element);
> >>>   }
> >>>
> >>> Could you please explain the synchronization for the following
> scenario?
> >>>
> >>> There are two workers.
> >>> Both workers read the same state at the same time and the state was
> not set yet. In this case, both will get null in the response (I believe)
> >>> Both of them will try to set the state and send the output out.
> >>>
> >>> What will happen in this scenario?
> >>>
> >>> Thank you
> >>> -Binh
> >>>
> >>>
> >>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka 
> wrote:
> 
>  Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
> 
>  Thanks,
>  Ankur
> 
>  On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van 
> wrote:
> >
> > Hi,
> >
> > I am writing a pipeline and want to apply deduplication. I look at
> Deduplicate transform that Beam provides and wonder about its usage. Do I
> need to shuffle input collection by key before calling this transformation?
> I look at its source code and it doesn’t do any shuffle so wonder how it
> works when let’s say there are duplicates and the duplicated elements are
> processed concurrently on multiple workers.
> >
> > Thank you
> > -Binh
>


Re: Deduplicate usage

2023-03-02 Thread Robert Bradshaw via user
Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed transactional store with the appropriate
locks.) There is no need for you to do anything before calling the
Deduplicate transform.

On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van  wrote:
>
> Thanks Reuven,
>
> I got the idea of the state is per key and keys are distributed across 
> workers but I am trying to understand where/how the distribution part is 
> implemented so that elements with the same keys will go to the same worker. 
> Do I need to do this before calling `Deduplicate` transform? If not then 
> where is it being implemented?
>
> Thanks
> -Binh
>
>
> On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user  
> wrote:
>>
>> State is per-key, and keys are distributed across workers. Two workers 
>> should not be working on the same state.
>>
>> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:
>>>
>>> Thank you Ankur,
>>>
>>> This is the current source code of Deduplicate transform.
>>>
>>>   Boolean seen = seenState.read();
>>>   // Seen state is either set or not set so if it has been set then it 
>>> must be true.
>>>   if (seen == null) {
>>> // We don't want the expiry timer to hold up watermarks.
>>> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>> seenState.write(true);
>>> receiver.output(element);
>>>   }
>>>
>>> Could you please explain the synchronization for the following scenario?
>>>
>>> There are two workers.
>>> Both workers read the same state at the same time and the state was not set 
>>> yet. In this case, both will get null in the response (I believe)
>>> Both of them will try to set the state and send the output out.
>>>
>>> What will happen in this scenario?
>>>
>>> Thank you
>>> -Binh
>>>
>>>
>>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka  wrote:

 Hi Binh, The Deduplicate transform uses state api to do the de-duplication 
 which should do the needful operations to work across multiple concurrent 
 workers.

 Thanks,
 Ankur

 On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>
> Hi,
>
> I am writing a pipeline and want to apply deduplication. I look at 
> Deduplicate transform that Beam provides and wonder about its usage. Do I 
> need to shuffle input collection by key before calling this 
> transformation? I look at its source code and it doesn’t do any shuffle 
> so wonder how it works when let’s say there are duplicates and the 
> duplicated elements are processed concurrently on multiple workers.
>
> Thank you
> -Binh


Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Thanks Reuven,

I got the idea of the state is per key and keys are distributed across
workers but I am trying to understand where/how the distribution part is
implemented so that elements with the same keys will go to the same worker.
Do I need to do this before calling `Deduplicate` transform? If not then
where is it being implemented?

Thanks
-Binh


On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user 
wrote:

> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
>
> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van 
> wrote:
>
>> Thank you Ankur,
>>
>> This is the current source code of Deduplicate transform.
>>
>>   Boolean seen = seenState.read();
>>   // Seen state is either set or not set so if it has been set then it 
>> must be true.
>>   if (seen == null) {
>> // We don't want the expiry timer to hold up watermarks.
>> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>> seenState.write(true);
>> receiver.output(element);
>>   }
>>
>> Could you please explain the synchronization for the following scenario?
>>
>>- There are two workers.
>>- Both workers read the same state at the same time and the state was
>>not set yet. In this case, both will get null in the response (I
>>believe)
>>- Both of them will try to set the state and send the output out.
>>
>> What will happen in this scenario?
>>
>> Thank you
>> -Binh
>>
>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka 
>> wrote:
>>
>>> Hi Binh, The Deduplicate transform uses state api to do the
>>> de-duplication which should do the needful operations to work across
>>> multiple concurrent workers.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>>>
 Hi,

 I am writing a pipeline and want to apply deduplication. I look at
 Deduplicate transform that Beam provides and wonder about its usage.
 Do I need to shuffle input collection by key before calling this
 transformation? I look at its source code and it doesn’t do any shuffle so
 wonder how it works when let’s say there are duplicates and the duplicated
 elements are processed concurrently on multiple workers.

 Thank you
 -Binh

>>>


Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Talat Uyarer via user
Hi Andrew,

Thank you so much for your help. Sorry to hear you changed team :(  I can
handle calcite upgrades if there is a fix. I was working on calcite upgrade
but then we started having so many issues. That's why I stopped doing it.

Talat

On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud  wrote:

> Hi Talat,
>
> I managed to turn your test case into something against Calcite. It
> looks like there is a bug affecting tables that contain one or more
> single element structs and no multi element structs. I've sent the
> details to the Calcite mailing list here.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread_tlr9hsmx09by79h91nwp2d4nv8jfwsto&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr&s=g36wnBGvi7DQG7gvljaG08vXIhROyCoz5vWBBRS43Ag&e=
>
> I'm experimenting with ideas on how to work around this but a fix will
> likely require a Calcite upgrade, which is not something I'd have time
> to help with. (I'm not on the Google Beam team anymore.)
>
> Andrew
>
> On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
>  wrote:
> >
> > Hi @Andrew Pilloud
> >
> > Sorry for the late response. Yes your test is working fine. I changed
> the test input structure like our input structure. Now this test also has
> the same exception.
> >
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQL:
> > WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`,
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> > FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> > WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> > FROM `tempTable` AS `tempTable`)
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQLPlan>
> > LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
> >   LogicalFilter(condition=[=($0.name, 'innerStr')])
> > LogicalProject(name=[$0.name], id=[$1], value=[$2])
> >   BeamIOSourceRel(table=[[beam, panwRowTestTable]])
> >
> >
> > fieldList must not be null, type = VARCHAR
> > java.lang.AssertionError: fieldList must not be null, type = VARCHAR
> >
> > I dont know what is different from yours. I am sharing my version of the
> test also.
> >
> >
> > Index:
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > IDEA additional info:
> > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> > <+>UTF-8
> > ===
> > diff --git
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> > +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (date 1677017032324)
> > @@ -54,6 +54,9 @@
> >private static final Schema innerRowSchema =
> >
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
> >
> > +  private static final Schema innerPanwRowSchema =
> > +  Schema.builder().addStringField("name").build();
> > +
> >private static final Schema innerRowWithArraySchema =
> >Schema.builder()
> >.addStringField("string_field")
> > @@ -127,8 +130,12 @@
> >.build()))
> >.put(
> >"basicRowTestTable",
> > -  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> > -
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> > +  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
> > +.put(
> > +  "panwRowTestTable",
> > +
> TestBoundedTable.of(FieldType.row(innerPanwRowSchema), "user_info",
> FieldType.INT64, "id", FieldType.STRING, "value")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L,
> "some_value"))
> >.put(
> >"rowWithArrayTestTable",
> >
> TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col")
> > @@ -219,6 +226,21 @@
> >  .build());
> >  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> >}
> > +
> > +  @Test
> > +  public void testBasicRowWhereField() {
> > +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> > +PCollection stream =
> > +BeamSqlRelUtils.toPCollection(
> > +pipeline

Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is
not currently supported for Storage Write API.

On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang 
wrote:

> Thank you to Ahmed and Reuven for the tip on
> WriteResult::getFailedStorageApiInserts.
>
> When I tried to get the successful inserts through the Storage Write API,
> I received an error message saying that "Retrieving successful inserts is
> only supported for streaming inserts. Make sure
> withSuccessfulInsertsPropagation is correctly configured for
> BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
> this is not possible?  I tried setting triggeringFrequency +
> numStorageWriteApiStreams as required by Storage Write, and I tried to set
> successfulInsertsPropagation as directed in the error message.
>


Successful Inserts for Storage Write API?

2023-03-02 Thread Matthew Ouyang
Thank you to Ahmed and Reuven for the tip on WriteResult::
getFailedStorageApiInserts.

When I tried to get the successful inserts through the Storage Write API, I
received an error message saying that "Retrieving successful inserts is
only supported for streaming inserts. Make sure
withSuccessfulInsertsPropagation is correctly configured for
BigQueryIO.Write object."  Did I make a mistake, or is there a reason why
this is not possible?  I tried setting triggeringFrequency +
numStorageWriteApiStreams as required by Storage Write, and I tried to set
successfulInsertsPropagation as directed in the error message.


Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers
should not be working on the same state.

On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:

> Thank you Ankur,
>
> This is the current source code of Deduplicate transform.
>
>   Boolean seen = seenState.read();
>   // Seen state is either set or not set so if it has been set then it 
> must be true.
>   if (seen == null) {
> // We don't want the expiry timer to hold up watermarks.
> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> seenState.write(true);
> receiver.output(element);
>   }
>
> Could you please explain the synchronization for the following scenario?
>
>- There are two workers.
>- Both workers read the same state at the same time and the state was
>not set yet. In this case, both will get null in the response (I
>believe)
>- Both of them will try to set the state and send the output out.
>
> What will happen in this scenario?
>
> Thank you
> -Binh
>
> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka 
> wrote:
>
>> Hi Binh, The Deduplicate transform uses state api to do the
>> de-duplication which should do the needful operations to work across
>> multiple concurrent workers.
>>
>> Thanks,
>> Ankur
>>
>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>>
>>> Hi,
>>>
>>> I am writing a pipeline and want to apply deduplication. I look at
>>> Deduplicate transform that Beam provides and wonder about its usage. Do
>>> I need to shuffle input collection by key before calling this
>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>> wonder how it works when let’s say there are duplicates and the duplicated
>>> elements are processed concurrently on multiple workers.
>>>
>>> Thank you
>>> -Binh
>>>
>>


Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Andrew Pilloud via user
Hi Talat,

I managed to turn your test case into something against Calcite. It
looks like there is a bug affecting tables that contain one or more
single element structs and no multi element structs. I've sent the
details to the Calcite mailing list here.
https://lists.apache.org/thread/tlr9hsmx09by79h91nwp2d4nv8jfwsto

I'm experimenting with ideas on how to work around this but a fix will
likely require a Calcite upgrade, which is not something I'd have time
to help with. (I'm not on the Google Beam team anymore.)

Andrew

On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
 wrote:
>
> Hi @Andrew Pilloud
>
> Sorry for the late response. Yes your test is working fine. I changed the 
> test input structure like our input structure. Now this test also has the 
> same exception.
>
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQL:
> WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`, 
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT 
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> FROM `tempTable` AS `tempTable`)
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQLPlan>
> LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
>   LogicalFilter(condition=[=($0.name, 'innerStr')])
> LogicalProject(name=[$0.name], id=[$1], value=[$2])
>   BeamIOSourceRel(table=[[beam, panwRowTestTable]])
>
>
> fieldList must not be null, type = VARCHAR
> java.lang.AssertionError: fieldList must not be null, type = VARCHAR
>
> I dont know what is different from yours. I am sharing my version of the test 
> also.
>
>
> Index: 
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> ===
> diff --git 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> --- 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> +++ 
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (date 1677017032324)
> @@ -54,6 +54,9 @@
>private static final Schema innerRowSchema =
>
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
>
> +  private static final Schema innerPanwRowSchema =
> +  Schema.builder().addStringField("name").build();
> +
>private static final Schema innerRowWithArraySchema =
>Schema.builder()
>.addStringField("string_field")
> @@ -127,8 +130,12 @@
>.build()))
>.put(
>"basicRowTestTable",
> -  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
> -  
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> +  TestBoundedTable.of(FieldType.row(innerRowSchema), "col", 
> FieldType.INT64, "field")
> +  
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 
> 1L))
> +.put(
> +  "panwRowTestTable",
> +  TestBoundedTable.of(FieldType.row(innerPanwRowSchema), 
> "user_info", FieldType.INT64, "id", FieldType.STRING, "value")
> +  
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L, 
> "some_value"))
>.put(
>"rowWithArrayTestTable",
>TestBoundedTable.of(FieldType.row(rowWithArraySchema), 
> "col")
> @@ -219,6 +226,21 @@
>  .build());
>  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>}
> +
> +  @Test
> +  public void testBasicRowWhereField() {
> +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> +PCollection stream =
> +BeamSqlRelUtils.toPCollection(
> +pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM 
> panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` = 'innerStr') 
> SELECT * FROM tempTable"));
> +Schema outputSchema = Schema.builder().addRowField("col", 
> innerRowSchema).addInt64Field("field").build();
> +PAssert.that(stream)
> +.containsInAnyOrder(
> +Row.withSchema(outputSchema)
> +.addValues(Row.withSchema(innerRowSchema).addValues("name", 
> 1L).build(), 1L)
> +.build());
> +pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> +  }
>
>@Test

Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Thank you Ankur,

This is the current source code of Deduplicate transform.

  Boolean seen = seenState.read();
  // Seen state is either set or not set so if it has been set
then it must be true.
  if (seen == null) {
// We don't want the expiry timer to hold up watermarks.
expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
seenState.write(true);
receiver.output(element);
  }

Could you please explain the synchronization for the following scenario?

   - There are two workers.
   - Both workers read the same state at the same time and the state was
   not set yet. In this case, both will get null in the response (I believe)
   - Both of them will try to set the state and send the output out.

What will happen in this scenario?

Thank you
-Binh

On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka  wrote:

> Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
>
> Thanks,
> Ankur
>
> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>
>> Hi,
>>
>> I am writing a pipeline and want to apply deduplication. I look at
>> Deduplicate transform that Beam provides and wonder about its usage. Do
>> I need to shuffle input collection by key before calling this
>> transformation? I look at its source code and it doesn’t do any shuffle so
>> wonder how it works when let’s say there are duplicates and the duplicated
>> elements are processed concurrently on multiple workers.
>>
>> Thank you
>> -Binh
>>
>


Re: Deduplicate usage

2023-03-02 Thread Ankur Goenka
Hi Binh, The Deduplicate transform uses state api to do the de-duplication
which should do the needful operations to work across multiple concurrent
workers.

Thanks,
Ankur

On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:

> Hi,
>
> I am writing a pipeline and want to apply deduplication. I look at
> Deduplicate transform that Beam provides and wonder about its usage. Do I
> need to shuffle input collection by key before calling this transformation?
> I look at its source code and it doesn’t do any shuffle so wonder how it
> works when let’s say there are duplicates and the duplicated elements are
> processed concurrently on multiple workers.
>
> Thank you
> -Binh
>


Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Hi,

I am writing a pipeline and want to apply deduplication. I look at
Deduplicate transform that Beam provides and wonder about its usage. Do I
need to shuffle input collection by key before calling this transformation?
I look at its source code and it doesn’t do any shuffle so wonder how it
works when let’s say there are duplicates and the duplicated elements are
processed concurrently on multiple workers.

Thank you
-Binh