That fixed it.  I assume the issue is that the `assert` isn't run in
dataflow v1 (because asserts are disabled?), so maybeAdvance() is never
called, but should be?

Interestingly enough, IntelliJ actually flags this with a warning:
"'assert' has side effects: call to 'maybeAdvance()' mutates field 'next'"

On Thu, Mar 24, 2022 at 7:21 PM Robert Bradshaw <rober...@google.com> wrote:

> I'm still scratching my head over this one, but could you try
> reproducing with https://github.com/apache/beam/pull/17180 ?
>
> On Tue, Mar 22, 2022 at 8:31 PM Claire McGinty
> <claire.d.mcgi...@gmail.com> wrote:
> >
> > (Forgot to add, this was with the transform coder wrapped with
> NullableCoder so it didn’t fail outright.)
> >
> > - Claire
> >
> > On Tue, Mar 22, 2022 at 11:26 PM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>
> >> I was wondering if there’s any difference in how Dataflow v1 vs v2
> loads CoGbkResult iterables - in one of our internal pipielines that
> started failing, I added some logging to CoGbkResult and found that the
> TagIterable’s iterator would return hasNext() == true, but next() == null.
> >>
> >> - Claire
> >>
> >> On Tue, Mar 22, 2022 at 6:13 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>
> >>> BEAM-13541 is as far as I'm aware the only major change that has
> >>> happened in this code recently, but this should be totally agnostic of
> >>> Dataflow v1 vs. v2 vs. DirectRunner. Have there been any changes to
> >>> the shuffle/reiterable code?
> >>>
> >>> On Tue, Mar 22, 2022 at 2:37 PM Steve Niemitz <sniem...@apache.org>
> wrote:
> >>> >
> >>> > oh I just realized I responded to a different thread, feel free to
> ignore me.
> >>> >
> >>> > On Tue, Mar 22, 2022 at 5:34 PM Niel Markwick <ni...@google.com>
> wrote:
> >>> >>
> >>> >> yeah there does seem to be some heisenbug attributes...
> >>> >> It failed on 4 out of 6 runs with this  reproducer, and always
> succeeded with DirectRunner or 2.35.0...
> >>> >> It does seem to depend on the number of elements in the group.
> >>> >>
> >>> >>
> >>> >> On Tue, 22 Mar 2022 at 22:27, Steve Niemitz <sniem...@apache.org>
> wrote:
> >>> >>>
> >>> >>> Your email was actually what made me notice this! :D
> >>> >>>
> >>> >>> I haven't been able to reproduce the NPE you found (also on 2.37)
> but that certainly doesn't mean it's not a bug.
> >>> >>>
> >>> >>>
> >>> >>>
> >>> >>> On Tue, Mar 22, 2022 at 5:23 PM Niel Markwick <ni...@google.com>
> wrote:
> >>> >>>>
> >>> >>>> I have also seen this with Java beam 2.36.0 and 2.37.0, again
> with large groups...
> >>> >>>>
> >>> >>>> The behaviour is that some elements become null, which then
> causes either coder problems for the primitive types (as seen in Claire's
> stack trace), or downstream issues for custom types which are not expecting
> null elements...
> >>> >>>>
> >>> >>>> I have a relatively simple Java reproducer creating 200,000 KVs
> and doing a CoGroupByKey with a single KV. This runs fine on DirectRunner,
> and on Dataflow with 2.35.0, but fails on Dataflow on 2.36.0 and 2.47.0
> with the same error: "org.apache.beam.sdk.coders.CoderException: cannot
> encode a null String" in the CoGroupByKey step
> >>> >>>>
> >>> >>>>     Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
> >>> >>>>     String[] bigarray = new String[200000];
> >>> >>>>     Arrays.fill(bigarray, "Hello World");
> >>> >>>>     PCollection<KV<Integer, String>> right =
> >>> >>>>         p.apply(Create.of(Arrays.asList(bigarray)))
> >>> >>>>             .apply(
> >>> >>>>                 MapElements.into(new TypeDescriptor<KV<Integer,
> String>>() {})
> >>> >>>>                     .via(input -> KV.of(100, input)));
> >>> >>>>     PCollection<KV<Integer, String>> left =
> p.apply(Create.of(KV.of(100, "goodbye world")));
> >>> >>>>
> >>> >>>>     TupleTag<String> leftTag = new TupleTag<String>();
> >>> >>>>     TupleTag<String> rightTag = new TupleTag<String>();
> >>> >>>>
> >>> >>>>     PCollection<KV<Integer, CoGbkResult>> joinedResult =
> >>> >>>>         KeyedPCollectionTuple.of(leftTag, left)
> >>> >>>>             .and(rightTag, right)
> >>> >>>>             .apply("Join By Key", CoGroupByKey.<Integer>create());
> >>> >>>>
> >>> >>>>     joinedResult.apply(
> >>> >>>>         "Report",
> >>> >>>>         MapElements.into(TypeDescriptor.of(Void.class))
> >>> >>>>             .via(
> >>> >>>>                 (KV<Integer, CoGbkResult> element) -> {
> >>> >>>>                   Iterable<String> leftValues =
> element.getValue().getAll(leftTag);
> >>> >>>>                   Iterable<String> rightValues =
> element.getValue().getAll(rightTag);
> >>> >>>>
> >>> >>>>                   System.out.println("Key = " + element.getKey());
> >>> >>>>                   System.out.println("left size= " +
> Iterables.size(leftValues));
> >>> >>>>                   System.out.println("right size= " +
> Iterables.size(rightValues));
> >>> >>>>                   return (Void) null;
> >>> >>>>                 }));
> >>> >>>>
> >>> >>>>     p.run().waitUntilFinish();
> >>> >>>>
> >>> >>>>
> >>> >>>> On Fri, 18 Mar 2022, 16:01 Reuven Lax, <re...@google.com> wrote:
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> On Fri, Mar 18, 2022, 8:28 AM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>> >>>>>>
> >>> >>>>>> To add more investigative context, we discovered that these
> jobs succeed when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow
> V1 that produces this regression. Unfortunately it's not feasible to
> upgrade our whole fleet to Dataflow Prime right now, so we've created an
> internal Google support ticket for this, too.
> >>> >>>>>>
> >>> >>>>>> - Claire
> >>> >>>>>>
> >>> >>>>>> On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Hi! No, there aren't null strings in the data--it would have
> thrown a NPE much earlier if that were the case, and likely failed when run
> with Beam 2.35.0 also, unless null-handling changed too?
> >>> >>>>>>>
> >>> >>>>>>> - Claire
> >>> >>>>>>>
> >>> >>>>>>> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax <re...@google.com>
> wrote:
> >>> >>>>>>>>
> >>> >>>>>>>> Is it expected to have null strings in your data?
> >>> >>>>>>>>
> >>> >>>>>>>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>> >>>>>>>>>
> >>> >>>>>>>>> Hi Beam devs!
> >>> >>>>>>>>>
> >>> >>>>>>>>> I wanted to surface a possible regression with CoGroupByKeys
> in this transform. Our organization (which primarily uses Scio) has had
> several pipelines start failing after upgrading from Beam 2.35.0 to Beam
> 2.36.0, specifically during cogroup operations with large (>10,000) key
> groups.
> >>> >>>>>>>>>
> >>> >>>>>>>>> We initially saw this problem using Scio join operations,
> but I was able to reproduce using public datasets & Beam PTransforms
> directly (job code link; the repo README includes instructions to run it if
> anyone is interested). This job throws the following error when run in
> Dataflow with Beam 2.36.0:
> >>> >>>>>>>>>
> >>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unable to
> encode element
> '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb,
> org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]'
> with coder
> 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'.
> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
> >>> >>>>>>>>>
> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
> >>> >>>>>>>>>
> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
> >>> >>>>>>>>>
> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
> >>> >>>>>>>>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
> >>> >>>>>>>>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
> >>> >>>>>>>>>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
> >>> >>>>>>>>>
> org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192)
> >>> >>>>>>>>> Caused by: org.apache.beam.sdk.coders.CoderException: cannot
> encode a null String
> org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74)
> >>> >>>>>>>>>
> >>> >>>>>>>>> To troubleshoot, I first tried downgrading to Beam 2.35.0;
> the job then ran successfully. Next, I tried running the job using Beam
> 2.36.0, but with 2.35.0's version of CoGbkResult on the classpath; it ran
> successfully then, too. I'm wondering if it's related to the lazy-iterator
> improvements introduced in BEAM-13541? I wasn't able to reproduce it with a
> unit test yet, unfortunately; maybe it's unique to the way Dataflow
> lazily-loads large CoGbkResults.
> >>> >>>>>>>>>
> >>> >>>>>>>>> I'm continuing to try to reproduce this on a smaller scale,
> but wanted to flag it here for now; it's preventing our pipeline fleet from
> upgrading to Beam 2.36.0. Any insight would be appreciated!
> >>> >>>>>>>>>
> >>> >>>>>>>>> Thanks,
> >>> >>>>>>>>> Claire
> >>> >>>>>>>>>
> >>> >>>>>>>>>
> >>> >>>>>>>>>
> >>> >>>>>>>>>
>

Reply via email to