This is a little late, but I wanted to add that I tried re-running one of our failing pipelines with the latest Beam snapshot <https://repository.apache.org/content/repositories/snapshots/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.39.0-SNAPSHOT/> and it looks like Robert's PR solved the issue for us! Thanks for your help. Do you know when Beam 2.38.0 is expected to be released?
- Claire On Thu, Mar 24, 2022 at 8:31 PM Robert Bradshaw <[email protected]> wrote: > Excellent. Yes, that could totally explain why this was > runner-dependent. I'll try to get this into 2.38. > > On Thu, Mar 24, 2022 at 5:23 PM Steve Niemitz <[email protected]> wrote: > > > > That fixed it. I assume the issue is that the `assert` isn't run in > dataflow v1 (because asserts are disabled?), so maybeAdvance() is never > called, but should be? > > > > Interestingly enough, IntelliJ actually flags this with a warning: > > "'assert' has side effects: call to 'maybeAdvance()' mutates field > 'next'" > > > > On Thu, Mar 24, 2022 at 7:21 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> I'm still scratching my head over this one, but could you try > >> reproducing with https://github.com/apache/beam/pull/17180 ? > >> > >> On Tue, Mar 22, 2022 at 8:31 PM Claire McGinty > >> <[email protected]> wrote: > >> > > >> > (Forgot to add, this was with the transform coder wrapped with > NullableCoder so it didn’t fail outright.) > >> > > >> > - Claire > >> > > >> > On Tue, Mar 22, 2022 at 11:26 PM Claire McGinty < > [email protected]> wrote: > >> >> > >> >> I was wondering if there’s any difference in how Dataflow v1 vs v2 > loads CoGbkResult iterables - in one of our internal pipielines that > started failing, I added some logging to CoGbkResult and found that the > TagIterable’s iterator would return hasNext() == true, but next() == null. > >> >> > >> >> - Claire > >> >> > >> >> On Tue, Mar 22, 2022 at 6:13 PM Robert Bradshaw <[email protected]> > wrote: > >> >>> > >> >>> BEAM-13541 is as far as I'm aware the only major change that has > >> >>> happened in this code recently, but this should be totally agnostic > of > >> >>> Dataflow v1 vs. v2 vs. DirectRunner. Have there been any changes to > >> >>> the shuffle/reiterable code? > >> >>> > >> >>> On Tue, Mar 22, 2022 at 2:37 PM Steve Niemitz <[email protected]> > wrote: > >> >>> > > >> >>> > oh I just realized I responded to a different thread, feel free > to ignore me. > >> >>> > > >> >>> > On Tue, Mar 22, 2022 at 5:34 PM Niel Markwick <[email protected]> > wrote: > >> >>> >> > >> >>> >> yeah there does seem to be some heisenbug attributes... > >> >>> >> It failed on 4 out of 6 runs with this reproducer, and always > succeeded with DirectRunner or 2.35.0... > >> >>> >> It does seem to depend on the number of elements in the group. > >> >>> >> > >> >>> >> > >> >>> >> On Tue, 22 Mar 2022 at 22:27, Steve Niemitz <[email protected]> > wrote: > >> >>> >>> > >> >>> >>> Your email was actually what made me notice this! :D > >> >>> >>> > >> >>> >>> I haven't been able to reproduce the NPE you found (also on > 2.37) but that certainly doesn't mean it's not a bug. > >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> On Tue, Mar 22, 2022 at 5:23 PM Niel Markwick <[email protected]> > wrote: > >> >>> >>>> > >> >>> >>>> I have also seen this with Java beam 2.36.0 and 2.37.0, again > with large groups... > >> >>> >>>> > >> >>> >>>> The behaviour is that some elements become null, which then > causes either coder problems for the primitive types (as seen in Claire's > stack trace), or downstream issues for custom types which are not expecting > null elements... > >> >>> >>>> > >> >>> >>>> I have a relatively simple Java reproducer creating 200,000 > KVs and doing a CoGroupByKey with a single KV. This runs fine on > DirectRunner, and on Dataflow with 2.35.0, but fails on Dataflow on 2.36.0 > and 2.47.0 with the same error: "org.apache.beam.sdk.coders.CoderException: > cannot encode a null String" in the CoGroupByKey step > >> >>> >>>> > >> >>> >>>> Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); > >> >>> >>>> String[] bigarray = new String[200000]; > >> >>> >>>> Arrays.fill(bigarray, "Hello World"); > >> >>> >>>> PCollection<KV<Integer, String>> right = > >> >>> >>>> p.apply(Create.of(Arrays.asList(bigarray))) > >> >>> >>>> .apply( > >> >>> >>>> MapElements.into(new > TypeDescriptor<KV<Integer, String>>() {}) > >> >>> >>>> .via(input -> KV.of(100, input))); > >> >>> >>>> PCollection<KV<Integer, String>> left = > p.apply(Create.of(KV.of(100, "goodbye world"))); > >> >>> >>>> > >> >>> >>>> TupleTag<String> leftTag = new TupleTag<String>(); > >> >>> >>>> TupleTag<String> rightTag = new TupleTag<String>(); > >> >>> >>>> > >> >>> >>>> PCollection<KV<Integer, CoGbkResult>> joinedResult = > >> >>> >>>> KeyedPCollectionTuple.of(leftTag, left) > >> >>> >>>> .and(rightTag, right) > >> >>> >>>> .apply("Join By Key", > CoGroupByKey.<Integer>create()); > >> >>> >>>> > >> >>> >>>> joinedResult.apply( > >> >>> >>>> "Report", > >> >>> >>>> MapElements.into(TypeDescriptor.of(Void.class)) > >> >>> >>>> .via( > >> >>> >>>> (KV<Integer, CoGbkResult> element) -> { > >> >>> >>>> Iterable<String> leftValues = > element.getValue().getAll(leftTag); > >> >>> >>>> Iterable<String> rightValues = > element.getValue().getAll(rightTag); > >> >>> >>>> > >> >>> >>>> System.out.println("Key = " + > element.getKey()); > >> >>> >>>> System.out.println("left size= " + > Iterables.size(leftValues)); > >> >>> >>>> System.out.println("right size= " + > Iterables.size(rightValues)); > >> >>> >>>> return (Void) null; > >> >>> >>>> })); > >> >>> >>>> > >> >>> >>>> p.run().waitUntilFinish(); > >> >>> >>>> > >> >>> >>>> > >> >>> >>>> On Fri, 18 Mar 2022, 16:01 Reuven Lax, <[email protected]> > wrote: > >> >>> >>>>> > >> >>> >>>>> > >> >>> >>>>> On Fri, Mar 18, 2022, 8:28 AM Claire McGinty < > [email protected]> wrote: > >> >>> >>>>>> > >> >>> >>>>>> To add more investigative context, we discovered that these > jobs succeed when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow > V1 that produces this regression. Unfortunately it's not feasible to > upgrade our whole fleet to Dataflow Prime right now, so we've created an > internal Google support ticket for this, too. > >> >>> >>>>>> > >> >>> >>>>>> - Claire > >> >>> >>>>>> > >> >>> >>>>>> On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty < > [email protected]> wrote: > >> >>> >>>>>>> > >> >>> >>>>>>> Hi! No, there aren't null strings in the data--it would > have thrown a NPE much earlier if that were the case, and likely failed > when run with Beam 2.35.0 also, unless null-handling changed too? > >> >>> >>>>>>> > >> >>> >>>>>>> - Claire > >> >>> >>>>>>> > >> >>> >>>>>>> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax < > [email protected]> wrote: > >> >>> >>>>>>>> > >> >>> >>>>>>>> Is it expected to have null strings in your data? > >> >>> >>>>>>>> > >> >>> >>>>>>>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty < > [email protected]> wrote: > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> Hi Beam devs! > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> I wanted to surface a possible regression with > CoGroupByKeys in this transform. Our organization (which primarily uses > Scio) has had several pipelines start failing after upgrading from Beam > 2.35.0 to Beam 2.36.0, specifically during cogroup operations with large > (>10,000) key groups. > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> We initially saw this problem using Scio join operations, > but I was able to reproduce using public datasets & Beam PTransforms > directly (job code link; the repo README includes instructions to run it if > anyone is interested). This job throws the following error when run in > Dataflow with Beam 2.36.0: > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unable to > encode element > '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb, > org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]' > with coder > 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'. > org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) > >> >>> >>>>>>>>> > org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) > >> >>> >>>>>>>>> > org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) > >> >>> >>>>>>>>> > org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) > >> >>> >>>>>>>>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642) > >> >>> >>>>>>>>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) > >> >>> >>>>>>>>> > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) > >> >>> >>>>>>>>> > org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) > >> >>> >>>>>>>>> Caused by: org.apache.beam.sdk.coders.CoderException: > cannot encode a null String > org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74) > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> To troubleshoot, I first tried downgrading to Beam > 2.35.0; the job then ran successfully. Next, I tried running the job using > Beam 2.36.0, but with 2.35.0's version of CoGbkResult on the classpath; it > ran successfully then, too. I'm wondering if it's related to the > lazy-iterator improvements introduced in BEAM-13541? I wasn't able to > reproduce it with a unit test yet, unfortunately; maybe it's unique to the > way Dataflow lazily-loads large CoGbkResults. > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> I'm continuing to try to reproduce this on a smaller > scale, but wanted to flag it here for now; it's preventing our pipeline > fleet from upgrading to Beam 2.36.0. Any insight would be appreciated! > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> Thanks, > >> >>> >>>>>>>>> Claire > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> > >> >>> >>>>>>>>> >
