Hi Rahul,
what version of Beam are you using? There was a bug [1], which was fixed
in 2.14.0. This bug could cause what you observe.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-7269
On 8/9/19 10:35 AM, rahul patwari wrote:
Hi Robert,
When PCollection is created using
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting
"Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect"
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The
elements are not duplicated. The elements are not batched differently.
I have used System.identityHashcode(this) to convert PCollection<Row>
to PCollection<KV<Integer, Row>> to apply Stateful
Pardo(GroupIntoBatches) as per your suggestion in this thread
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
To verify the result, I have used GroupByKey, which should give the
same result as GroupIntoBatches *for my case*.
However, When PCollection is created using "Create.of(listOfRow)", the
results are always correct.
Regards,
Rahul
On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?
On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
<[email protected] <mailto:[email protected]>>
wrote:
>
> I only ran in Direct runner. I will run in other runners and let
you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <[email protected]
<mailto:[email protected]>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
<[email protected] <mailto:[email protected]>>
wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches
PTransform.
>>> I am using Create.of() PTransform to create a PCollection from
in-memory. When a coder is given with Create.of() PTransform, I am
facing the issue.
>>> If the coder is not provided, the results are consistent and
correct(Maybe this is just a coincidence and the problem is at
some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul