Hi Rahul,

what version of Beam are you using? There was a bug [1], which was fixed in 2.14.0. This bug could cause what you observe.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:
Hi Robert,

When PCollection is created using "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results. By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of the times). By "Incorrect" result, I mean that the elements are missing. The elements are not duplicated. The elements are not batched differently.

I have used System.identityHashcode(this) to convert PCollection<Row> to PCollection<KV<Integer, Row>> to apply Stateful Pardo(GroupIntoBatches) as per your suggestion in this thread <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> To verify the result, I have used GroupByKey, which should give the same result as GroupIntoBatches *for my case*.

However, When PCollection is created using "Create.of(listOfRow)", the results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote:

    Could you clarify what you mean by "inconsistent" and "incorrect"? Are
    elements missing/duplicated, or just batched differently?

    On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
    <rahulpatwari8...@gmail.com <mailto:rahulpatwari8...@gmail.com>>
    wrote:
    >
    > I only ran in Direct runner. I will run in other runners and let
    you know the results.
    > I am not setting "streaming" when executing.
    >
    > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com
    <mailto:lc...@google.com>> wrote:
    >>
    >> Have you tried running this on more than one runner (e.g.
    Dataflow, Flink, Direct)?
    >>
    >> Are you setting --streaming when executing?
    >>
    >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
    <rahulpatwari8...@gmail.com <mailto:rahulpatwari8...@gmail.com>>
    wrote:
    >>>
    >>> Hi,
    >>>
    >>> I am getting inconsistent results when using GroupIntoBatches
    PTransform.
    >>> I am using Create.of() PTransform to create a PCollection from
    in-memory. When a coder is given with Create.of() PTransform, I am
    facing the issue.
    >>> If the coder is not provided, the results are consistent and
    correct(Maybe this is just a coincidence and the problem is at
    some other place).
    >>> If Batch Size is 1, results are always consistent.
    >>>
    >>> Not sure if this is an issue with
    Serialization/Deserialization (or) GroupIntoBatches (or)
    Create.of() PTransform.
    >>>
    >>> The Java code, expected correct results, and inconsistent
    results are available at https://github.com/rahul8383/beam-examples
    >>>
    >>> Thanks,
    >>> Rahul

Reply via email to