Hi Rahul,

I cannot tell for sure. The fix was applied at runners-core, so - technically - it was possible that multiple runners were affected. A runner would be affected, if and only if, it would use something that depends on hashCode() of StateTag (or StateSpec) and user would use a Coder for that state that doesn't correctly implement hashCode() and equals() - SchemaCoder is one of such example.

After a few greps on the repository, I think that it might be possible, that Dataflow runner would be (more or less) affected by this as well (but someone from Dataflow team might confirm or disprove that better than me). Possibly affected code is at WindmillStateReader.java, which uses ConcurrentHashMap with StateTag as key. I'm not able to tell the consequences of that. I didn't find any obvious uses of HashMap or HashSet of StateTags in other runners. But that doesn't mean, that there really isn't any. :-)

Either way, by using version 2.14.0 you should be safe on all runners.

Jan

On 8/9/19 10:59 AM, rahul patwari wrote:
Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the results are always correct. No more inconsistencies.

Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Rahul,

    what version of Beam are you using? There was a bug [1], which was
    fixed in 2.14.0. This bug could cause what you observe.

    Jan

    [1] https://issues.apache.org/jira/browse/BEAM-7269

    On 8/9/19 10:35 AM, rahul patwari wrote:
    Hi Robert,

    When PCollection is created using
    "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am
    getting "Inconsistent" results.
    By "Inconsistent", I mean that the result is "Incorrect"
    sometimes(most of the times).
    By "Incorrect" result, I mean that the elements are missing. The
    elements are not duplicated. The elements are not batched
    differently.

    I have used System.identityHashcode(this) to convert
    PCollection<Row> to PCollection<KV<Integer, Row>> to apply
    Stateful Pardo(GroupIntoBatches) as per your suggestion in this
    thread
    
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>

    To verify the result, I have used GroupByKey, which should give
    the same result as GroupIntoBatches *for my case*.

    However, When PCollection is created using
    "Create.of(listOfRow)", the results are always correct.

    Regards,
    Rahul

    On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw
    <rober...@google.com <mailto:rober...@google.com>> wrote:

        Could you clarify what you mean by "inconsistent" and
        "incorrect"? Are
        elements missing/duplicated, or just batched differently?

        On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
        <rahulpatwari8...@gmail.com
        <mailto:rahulpatwari8...@gmail.com>> wrote:
        >
        > I only ran in Direct runner. I will run in other runners
        and let you know the results.
        > I am not setting "streaming" when executing.
        >
        > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com
        <mailto:lc...@google.com>> wrote:
        >>
        >> Have you tried running this on more than one runner (e.g.
        Dataflow, Flink, Direct)?
        >>
        >> Are you setting --streaming when executing?
        >>
        >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
        <rahulpatwari8...@gmail.com
        <mailto:rahulpatwari8...@gmail.com>> wrote:
        >>>
        >>> Hi,
        >>>
        >>> I am getting inconsistent results when using
        GroupIntoBatches PTransform.
        >>> I am using Create.of() PTransform to create a PCollection
        from in-memory. When a coder is given with Create.of()
        PTransform, I am facing the issue.
        >>> If the coder is not provided, the results are consistent
        and correct(Maybe this is just a coincidence and the problem
        is at some other place).
        >>> If Batch Size is 1, results are always consistent.
        >>>
        >>> Not sure if this is an issue with
        Serialization/Deserialization (or) GroupIntoBatches (or)
        Create.of() PTransform.
        >>>
        >>> The Java code, expected correct results, and inconsistent
        results are available at
        https://github.com/rahul8383/beam-examples
        >>>
        >>> Thanks,
        >>> Rahul

Reply via email to