oh I just realized I responded to a different thread, feel free to ignore
me.

On Tue, Mar 22, 2022 at 5:34 PM Niel Markwick <ni...@google.com> wrote:

> yeah there does seem to be some heisenbug attributes...
> It failed on 4 out of 6 runs with this  reproducer, and always succeeded
> with DirectRunner or 2.35.0...
> It does seem to depend on the number of elements in the group.
>
>
> On Tue, 22 Mar 2022 at 22:27, Steve Niemitz <sniem...@apache.org> wrote:
>
>> Your email was actually what made me notice this! :D
>>
>> I haven't been able to reproduce the NPE you found (also on 2.37) but
>> that certainly doesn't mean it's not a bug.
>>
>>
>>
>> On Tue, Mar 22, 2022 at 5:23 PM Niel Markwick <ni...@google.com> wrote:
>>
>>> I have also seen this with Java beam 2.36.0 and 2.37.0, again with large
>>> groups...
>>>
>>> The behaviour is that some elements become null, which then causes
>>> either coder problems for the primitive types (as seen in Claire's stack
>>> trace), or downstream issues for custom types which are not expecting null
>>> elements...
>>>
>>> I have a relatively simple Java reproducer creating 200,000 KVs and
>>> doing a CoGroupByKey with a single KV. This runs fine on DirectRunner, and
>>> on Dataflow with 2.35.0, but fails on Dataflow on 2.36.0 and 2.47.0 with
>>> the same error: "org.apache.beam.sdk.coders.CoderException: cannot
>>> encode a null String" in the CoGroupByKey step
>>>
>>>     Pipeline p =
>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
>>>     String[] bigarray = new String[200000];
>>>     Arrays.fill(bigarray, "Hello World");
>>>     PCollection<KV<Integer, String>> right =
>>>         p.apply(Create.of(Arrays.asList(bigarray)))
>>>             .apply(
>>>                 MapElements.into(new TypeDescriptor<KV<Integer,
>>> String>>() {})
>>>                     .via(input -> KV.of(100, input)));
>>>     PCollection<KV<Integer, String>> left = p.apply(Create.of(KV.of(100,
>>> "goodbye world")));
>>>
>>>     TupleTag<String> leftTag = new TupleTag<String>();
>>>     TupleTag<String> rightTag = new TupleTag<String>();
>>>
>>>     PCollection<KV<Integer, CoGbkResult>> joinedResult =
>>>         KeyedPCollectionTuple.of(leftTag, left)
>>>             .and(rightTag, right)
>>>             .apply("Join By Key", CoGroupByKey.<Integer>create());
>>>
>>>     joinedResult.apply(
>>>         "Report",
>>>         MapElements.into(TypeDescriptor.of(Void.class))
>>>             .via(
>>>                 (KV<Integer, CoGbkResult> element) -> {
>>>                   Iterable<String> leftValues =
>>> element.getValue().getAll(leftTag);
>>>                   Iterable<String> rightValues =
>>> element.getValue().getAll(rightTag);
>>>
>>>                   System.out.println("Key = " + element.getKey());
>>>                   System.out.println("left size= " +
>>> Iterables.size(leftValues));
>>>                   System.out.println("right size= " +
>>> Iterables.size(rightValues));
>>>                   return (Void) null;
>>>                 }));
>>>
>>>     p.run().waitUntilFinish();
>>>
>>>
>>> On Fri, 18 Mar 2022, 16:01 Reuven Lax, <re...@google.com> wrote:
>>>
>>>>
>>>> On Fri, Mar 18, 2022, 8:28 AM Claire McGinty <
>>>> claire.d.mcgi...@gmail.com> wrote:
>>>>
>>>>> To add more investigative context, we discovered that these jobs
>>>>> succeed when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow V1
>>>>> that produces this regression. Unfortunately it's not feasible to upgrade
>>>>> our whole fleet to Dataflow Prime right now, so we've created an internal
>>>>> Google support ticket for this, too.
>>>>>
>>>>> - Claire
>>>>>
>>>>> On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty <
>>>>> claire.d.mcgi...@gmail.com> wrote:
>>>>>
>>>>>> Hi! No, there aren't null strings in the data--it would have thrown a
>>>>>> NPE much earlier if that were the case, and likely failed when run with
>>>>>> Beam 2.35.0 also, unless null-handling changed too?
>>>>>>
>>>>>> - Claire
>>>>>>
>>>>>> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Is it expected to have null strings in your data?
>>>>>>>
>>>>>>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty <
>>>>>>> claire.d.mcgi...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Beam devs!
>>>>>>>>
>>>>>>>> I wanted to surface a possible regression with CoGroupByKeys in
>>>>>>>> this transform. Our organization (which primarily uses Scio) has had
>>>>>>>> several pipelines start failing after upgrading from Beam 2.35.0 to 
>>>>>>>> Beam
>>>>>>>> 2.36.0, specifically during cogroup operations with large (>10,000) key
>>>>>>>> groups.
>>>>>>>>
>>>>>>>> We initially saw this problem using Scio join
>>>>>>>> <https://github.com/spotify/scio/blob/be7166c58450bd59ae5d0048d39ef3ea0d5ed107/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala#L84-L112>
>>>>>>>>  operations,
>>>>>>>> but I was able to reproduce using public datasets & Beam
>>>>>>>> PTransforms directly (job code link
>>>>>>>> <https://github.com/clairemcginty/beam-test/blob/main/src/main/scala/clairem/data/JoinTest.scala#L32>;
>>>>>>>> the repo README includes instructions to run it if anyone is 
>>>>>>>> interested).
>>>>>>>> This job throws the following error when run in Dataflow with Beam 
>>>>>>>> 2.36.0:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unable to encode
>>>>>>>> element
>>>>>>>> '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb,
>>>>>>>> org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]'
>>>>>>>> with coder
>>>>>>>> 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'.
>>>>>>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>>>>>>>>
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>>>>>>>>
>>>>>>>> org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192)
>>>>>>>> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode
>>>>>>>> a null String
>>>>>>>> org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74)
>>>>>>>>
>>>>>>>> To troubleshoot, I first tried downgrading to Beam 2.35.0; the job
>>>>>>>> then ran successfully. Next, I tried running the job using Beam 
>>>>>>>> 2.36.0, but
>>>>>>>> with 2.35.0's version of CoGbkResult
>>>>>>>> <https://github.com/apache/beam/blob/v2.35.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java>
>>>>>>>>  on
>>>>>>>> the classpath; it ran successfully then, too. I'm wondering if it's 
>>>>>>>> related
>>>>>>>> to the lazy-iterator improvements introduced in BEAM-13541
>>>>>>>> <https://github.com/apache/beam/pull/16354>? I wasn't able to
>>>>>>>> reproduce it with a unit test yet, unfortunately; maybe it's unique to 
>>>>>>>> the
>>>>>>>> way Dataflow lazily-loads large CoGbkResults.
>>>>>>>>
>>>>>>>> I'm continuing to try to reproduce this on a smaller scale, but
>>>>>>>> wanted to flag it here for now; it's preventing our pipeline fleet from
>>>>>>>> upgrading to Beam 2.36.0. Any insight would be appreciated!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Claire
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to