(Forgot to add, this was with the transform coder wrapped with NullableCoder so it didn’t fail outright.)
- Claire On Tue, Mar 22, 2022 at 11:26 PM Claire McGinty <claire.d.mcgi...@gmail.com> wrote: > I was wondering if there’s any difference in how Dataflow v1 vs v2 loads > CoGbkResult iterables - in one of our internal pipielines that started > failing, I added some logging to CoGbkResult and found that the > TagIterable’s iterator would return hasNext() == true, but next() == null. > > - Claire > > On Tue, Mar 22, 2022 at 6:13 PM Robert Bradshaw <rober...@google.com> > wrote: > >> BEAM-13541 is as far as I'm aware the only major change that has >> happened in this code recently, but this should be totally agnostic of >> Dataflow v1 vs. v2 vs. DirectRunner. Have there been any changes to >> the shuffle/reiterable code? >> >> On Tue, Mar 22, 2022 at 2:37 PM Steve Niemitz <sniem...@apache.org> >> wrote: >> > >> > oh I just realized I responded to a different thread, feel free to >> ignore me. >> > >> > On Tue, Mar 22, 2022 at 5:34 PM Niel Markwick <ni...@google.com> wrote: >> >> >> >> yeah there does seem to be some heisenbug attributes... >> >> It failed on 4 out of 6 runs with this reproducer, and always >> succeeded with DirectRunner or 2.35.0... >> >> It does seem to depend on the number of elements in the group. >> >> >> >> >> >> On Tue, 22 Mar 2022 at 22:27, Steve Niemitz <sniem...@apache.org> >> wrote: >> >>> >> >>> Your email was actually what made me notice this! :D >> >>> >> >>> I haven't been able to reproduce the NPE you found (also on 2.37) but >> that certainly doesn't mean it's not a bug. >> >>> >> >>> >> >>> >> >>> On Tue, Mar 22, 2022 at 5:23 PM Niel Markwick <ni...@google.com> >> wrote: >> >>>> >> >>>> I have also seen this with Java beam 2.36.0 and 2.37.0, again with >> large groups... >> >>>> >> >>>> The behaviour is that some elements become null, which then causes >> either coder problems for the primitive types (as seen in Claire's stack >> trace), or downstream issues for custom types which are not expecting null >> elements... >> >>>> >> >>>> I have a relatively simple Java reproducer creating 200,000 KVs and >> doing a CoGroupByKey with a single KV. This runs fine on DirectRunner, and >> on Dataflow with 2.35.0, but fails on Dataflow on 2.36.0 and 2.47.0 with >> the same error: "org.apache.beam.sdk.coders.CoderException: cannot encode a >> null String" in the CoGroupByKey step >> >>>> >> >>>> Pipeline p = >> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); >> >>>> String[] bigarray = new String[200000]; >> >>>> Arrays.fill(bigarray, "Hello World"); >> >>>> PCollection<KV<Integer, String>> right = >> >>>> p.apply(Create.of(Arrays.asList(bigarray))) >> >>>> .apply( >> >>>> MapElements.into(new TypeDescriptor<KV<Integer, >> String>>() {}) >> >>>> .via(input -> KV.of(100, input))); >> >>>> PCollection<KV<Integer, String>> left = >> p.apply(Create.of(KV.of(100, "goodbye world"))); >> >>>> >> >>>> TupleTag<String> leftTag = new TupleTag<String>(); >> >>>> TupleTag<String> rightTag = new TupleTag<String>(); >> >>>> >> >>>> PCollection<KV<Integer, CoGbkResult>> joinedResult = >> >>>> KeyedPCollectionTuple.of(leftTag, left) >> >>>> .and(rightTag, right) >> >>>> .apply("Join By Key", CoGroupByKey.<Integer>create()); >> >>>> >> >>>> joinedResult.apply( >> >>>> "Report", >> >>>> MapElements.into(TypeDescriptor.of(Void.class)) >> >>>> .via( >> >>>> (KV<Integer, CoGbkResult> element) -> { >> >>>> Iterable<String> leftValues = >> element.getValue().getAll(leftTag); >> >>>> Iterable<String> rightValues = >> element.getValue().getAll(rightTag); >> >>>> >> >>>> System.out.println("Key = " + element.getKey()); >> >>>> System.out.println("left size= " + >> Iterables.size(leftValues)); >> >>>> System.out.println("right size= " + >> Iterables.size(rightValues)); >> >>>> return (Void) null; >> >>>> })); >> >>>> >> >>>> p.run().waitUntilFinish(); >> >>>> >> >>>> >> >>>> On Fri, 18 Mar 2022, 16:01 Reuven Lax, <re...@google.com> wrote: >> >>>>> >> >>>>> >> >>>>> On Fri, Mar 18, 2022, 8:28 AM Claire McGinty < >> claire.d.mcgi...@gmail.com> wrote: >> >>>>>> >> >>>>>> To add more investigative context, we discovered that these jobs >> succeed when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow V1 >> that produces this regression. Unfortunately it's not feasible to upgrade >> our whole fleet to Dataflow Prime right now, so we've created an internal >> Google support ticket for this, too. >> >>>>>> >> >>>>>> - Claire >> >>>>>> >> >>>>>> On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty < >> claire.d.mcgi...@gmail.com> wrote: >> >>>>>>> >> >>>>>>> Hi! No, there aren't null strings in the data--it would have >> thrown a NPE much earlier if that were the case, and likely failed when run >> with Beam 2.35.0 also, unless null-handling changed too? >> >>>>>>> >> >>>>>>> - Claire >> >>>>>>> >> >>>>>>> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax <re...@google.com> >> wrote: >> >>>>>>>> >> >>>>>>>> Is it expected to have null strings in your data? >> >>>>>>>> >> >>>>>>>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty < >> claire.d.mcgi...@gmail.com> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi Beam devs! >> >>>>>>>>> >> >>>>>>>>> I wanted to surface a possible regression with CoGroupByKeys in >> this transform. Our organization (which primarily uses Scio) has had >> several pipelines start failing after upgrading from Beam 2.35.0 to Beam >> 2.36.0, specifically during cogroup operations with large (>10,000) key >> groups. >> >>>>>>>>> >> >>>>>>>>> We initially saw this problem using Scio join operations, but I >> was able to reproduce using public datasets & Beam PTransforms directly >> (job code link; the repo README includes instructions to run it if anyone >> is interested). This job throws the following error when run in Dataflow >> with Beam 2.36.0: >> >>>>>>>>> >> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unable to encode >> element >> '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb, >> org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]' >> with coder >> 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'. >> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) >> >>>>>>>>> >> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) >> >>>>>>>>> >> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) >> >>>>>>>>> >> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) >> >>>>>>>>> >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642) >> >>>>>>>>> >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) >> >>>>>>>>> >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) >> >>>>>>>>> >> org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) >> >>>>>>>>> Caused by: org.apache.beam.sdk.coders.CoderException: cannot >> encode a null String >> org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74) >> >>>>>>>>> >> >>>>>>>>> To troubleshoot, I first tried downgrading to Beam 2.35.0; the >> job then ran successfully. Next, I tried running the job using Beam 2.36.0, >> but with 2.35.0's version of CoGbkResult on the classpath; it ran >> successfully then, too. I'm wondering if it's related to the lazy-iterator >> improvements introduced in BEAM-13541? I wasn't able to reproduce it with a >> unit test yet, unfortunately; maybe it's unique to the way Dataflow >> lazily-loads large CoGbkResults. >> >>>>>>>>> >> >>>>>>>>> I'm continuing to try to reproduce this on a smaller scale, but >> wanted to flag it here for now; it's preventing our pipeline fleet from >> upgrading to Beam 2.36.0. Any insight would be appreciated! >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Claire >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >