oh I just realized I responded to a different thread, feel free to ignore me.
On Tue, Mar 22, 2022 at 5:34 PM Niel Markwick <ni...@google.com> wrote: > yeah there does seem to be some heisenbug attributes... > It failed on 4 out of 6 runs with this reproducer, and always succeeded > with DirectRunner or 2.35.0... > It does seem to depend on the number of elements in the group. > > > On Tue, 22 Mar 2022 at 22:27, Steve Niemitz <sniem...@apache.org> wrote: > >> Your email was actually what made me notice this! :D >> >> I haven't been able to reproduce the NPE you found (also on 2.37) but >> that certainly doesn't mean it's not a bug. >> >> >> >> On Tue, Mar 22, 2022 at 5:23 PM Niel Markwick <ni...@google.com> wrote: >> >>> I have also seen this with Java beam 2.36.0 and 2.37.0, again with large >>> groups... >>> >>> The behaviour is that some elements become null, which then causes >>> either coder problems for the primitive types (as seen in Claire's stack >>> trace), or downstream issues for custom types which are not expecting null >>> elements... >>> >>> I have a relatively simple Java reproducer creating 200,000 KVs and >>> doing a CoGroupByKey with a single KV. This runs fine on DirectRunner, and >>> on Dataflow with 2.35.0, but fails on Dataflow on 2.36.0 and 2.47.0 with >>> the same error: "org.apache.beam.sdk.coders.CoderException: cannot >>> encode a null String" in the CoGroupByKey step >>> >>> Pipeline p = >>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); >>> String[] bigarray = new String[200000]; >>> Arrays.fill(bigarray, "Hello World"); >>> PCollection<KV<Integer, String>> right = >>> p.apply(Create.of(Arrays.asList(bigarray))) >>> .apply( >>> MapElements.into(new TypeDescriptor<KV<Integer, >>> String>>() {}) >>> .via(input -> KV.of(100, input))); >>> PCollection<KV<Integer, String>> left = p.apply(Create.of(KV.of(100, >>> "goodbye world"))); >>> >>> TupleTag<String> leftTag = new TupleTag<String>(); >>> TupleTag<String> rightTag = new TupleTag<String>(); >>> >>> PCollection<KV<Integer, CoGbkResult>> joinedResult = >>> KeyedPCollectionTuple.of(leftTag, left) >>> .and(rightTag, right) >>> .apply("Join By Key", CoGroupByKey.<Integer>create()); >>> >>> joinedResult.apply( >>> "Report", >>> MapElements.into(TypeDescriptor.of(Void.class)) >>> .via( >>> (KV<Integer, CoGbkResult> element) -> { >>> Iterable<String> leftValues = >>> element.getValue().getAll(leftTag); >>> Iterable<String> rightValues = >>> element.getValue().getAll(rightTag); >>> >>> System.out.println("Key = " + element.getKey()); >>> System.out.println("left size= " + >>> Iterables.size(leftValues)); >>> System.out.println("right size= " + >>> Iterables.size(rightValues)); >>> return (Void) null; >>> })); >>> >>> p.run().waitUntilFinish(); >>> >>> >>> On Fri, 18 Mar 2022, 16:01 Reuven Lax, <re...@google.com> wrote: >>> >>>> >>>> On Fri, Mar 18, 2022, 8:28 AM Claire McGinty < >>>> claire.d.mcgi...@gmail.com> wrote: >>>> >>>>> To add more investigative context, we discovered that these jobs >>>>> succeed when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow V1 >>>>> that produces this regression. Unfortunately it's not feasible to upgrade >>>>> our whole fleet to Dataflow Prime right now, so we've created an internal >>>>> Google support ticket for this, too. >>>>> >>>>> - Claire >>>>> >>>>> On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty < >>>>> claire.d.mcgi...@gmail.com> wrote: >>>>> >>>>>> Hi! No, there aren't null strings in the data--it would have thrown a >>>>>> NPE much earlier if that were the case, and likely failed when run with >>>>>> Beam 2.35.0 also, unless null-handling changed too? >>>>>> >>>>>> - Claire >>>>>> >>>>>> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Is it expected to have null strings in your data? >>>>>>> >>>>>>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty < >>>>>>> claire.d.mcgi...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Beam devs! >>>>>>>> >>>>>>>> I wanted to surface a possible regression with CoGroupByKeys in >>>>>>>> this transform. Our organization (which primarily uses Scio) has had >>>>>>>> several pipelines start failing after upgrading from Beam 2.35.0 to >>>>>>>> Beam >>>>>>>> 2.36.0, specifically during cogroup operations with large (>10,000) key >>>>>>>> groups. >>>>>>>> >>>>>>>> We initially saw this problem using Scio join >>>>>>>> <https://github.com/spotify/scio/blob/be7166c58450bd59ae5d0048d39ef3ea0d5ed107/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala#L84-L112> >>>>>>>> operations, >>>>>>>> but I was able to reproduce using public datasets & Beam >>>>>>>> PTransforms directly (job code link >>>>>>>> <https://github.com/clairemcginty/beam-test/blob/main/src/main/scala/clairem/data/JoinTest.scala#L32>; >>>>>>>> the repo README includes instructions to run it if anyone is >>>>>>>> interested). >>>>>>>> This job throws the following error when run in Dataflow with Beam >>>>>>>> 2.36.0: >>>>>>>> >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unable to encode >>>>>>>> element >>>>>>>> '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb, >>>>>>>> org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]' >>>>>>>> with coder >>>>>>>> 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'. >>>>>>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) >>>>>>>> >>>>>>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) >>>>>>>> >>>>>>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) >>>>>>>> >>>>>>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) >>>>>>>> >>>>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642) >>>>>>>> >>>>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) >>>>>>>> >>>>>>>> org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) >>>>>>>> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode >>>>>>>> a null String >>>>>>>> org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74) >>>>>>>> >>>>>>>> To troubleshoot, I first tried downgrading to Beam 2.35.0; the job >>>>>>>> then ran successfully. Next, I tried running the job using Beam >>>>>>>> 2.36.0, but >>>>>>>> with 2.35.0's version of CoGbkResult >>>>>>>> <https://github.com/apache/beam/blob/v2.35.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java> >>>>>>>> on >>>>>>>> the classpath; it ran successfully then, too. I'm wondering if it's >>>>>>>> related >>>>>>>> to the lazy-iterator improvements introduced in BEAM-13541 >>>>>>>> <https://github.com/apache/beam/pull/16354>? I wasn't able to >>>>>>>> reproduce it with a unit test yet, unfortunately; maybe it's unique to >>>>>>>> the >>>>>>>> way Dataflow lazily-loads large CoGbkResults. >>>>>>>> >>>>>>>> I'm continuing to try to reproduce this on a smaller scale, but >>>>>>>> wanted to flag it here for now; it's preventing our pipeline fleet from >>>>>>>> upgrading to Beam 2.36.0. Any insight would be appreciated! >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Claire >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>