Hi Rahul,
I cannot tell for sure. The fix was applied at runners-core, so -
technically - it was possible that multiple runners were affected. A
runner would be affected, if and only if, it would use something that
depends on hashCode() of StateTag (or StateSpec) and user would use a
Coder for that state that doesn't correctly implement hashCode() and
equals() - SchemaCoder is one of such example.
After a few greps on the repository, I think that it might be possible,
that Dataflow runner would be (more or less) affected by this as well
(but someone from Dataflow team might confirm or disprove that better
than me). Possibly affected code is at WindmillStateReader.java, which
uses ConcurrentHashMap with StateTag as key. I'm not able to tell the
consequences of that. I didn't find any obvious uses of HashMap or
HashSet of StateTags in other runners. But that doesn't mean, that there
really isn't any. :-)
Either way, by using version 2.14.0 you should be safe on all runners.
Jan
On 8/9/19 10:59 AM, rahul patwari wrote:
Hi Jan,
I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and
the results are always correct. No more inconsistencies.
Does BEAM-7269 affect all the runners?
Thanks,
Rahul
On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Rahul,
what version of Beam are you using? There was a bug [1], which was
fixed in 2.14.0. This bug could cause what you observe.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-7269
On 8/9/19 10:35 AM, rahul patwari wrote:
Hi Robert,
When PCollection is created using
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am
getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect"
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The
elements are not duplicated. The elements are not batched
differently.
I have used System.identityHashcode(this) to convert
PCollection<Row> to PCollection<KV<Integer, Row>> to apply
Stateful Pardo(GroupIntoBatches) as per your suggestion in this
thread
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
To verify the result, I have used GroupByKey, which should give
the same result as GroupIntoBatches *for my case*.
However, When PCollection is created using
"Create.of(listOfRow)", the results are always correct.
Regards,
Rahul
On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
Could you clarify what you mean by "inconsistent" and
"incorrect"? Are
elements missing/duplicated, or just batched differently?
On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
<rahulpatwari8...@gmail.com
<mailto:rahulpatwari8...@gmail.com>> wrote:
>
> I only ran in Direct runner. I will run in other runners
and let you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com
<mailto:lc...@google.com>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
<rahulpatwari8...@gmail.com
<mailto:rahulpatwari8...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using
GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection
from in-memory. When a coder is given with Create.of()
PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent
and correct(Maybe this is just a coincidence and the problem
is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at
https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul