To add more investigative context, we discovered that these jobs succeed
when run with Dataflow Prime; it's just Beam 2.36.0 + Dataflow V1 that
produces this regression. Unfortunately it's not feasible to upgrade our
whole fleet to Dataflow Prime right now, so we've created an internal
Google support ticket for this, too.

- Claire

On Tue, Mar 15, 2022 at 12:22 PM Claire McGinty <claire.d.mcgi...@gmail.com>
wrote:

> Hi! No, there aren't null strings in the data--it would have thrown a NPE
> much earlier if that were the case, and likely failed when run with Beam
> 2.35.0 also, unless null-handling changed too?
>
> - Claire
>
> On Tue, Mar 15, 2022 at 12:19 PM Reuven Lax <re...@google.com> wrote:
>
>> Is it expected to have null strings in your data?
>>
>> On Tue, Mar 15, 2022 at 9:06 AM Claire McGinty <
>> claire.d.mcgi...@gmail.com> wrote:
>>
>>> Hi Beam devs!
>>>
>>> I wanted to surface a possible regression with CoGroupByKeys in this
>>> transform. Our organization (which primarily uses Scio) has had several
>>> pipelines start failing after upgrading from Beam 2.35.0 to Beam
>>> 2.36.0, specifically during cogroup operations with large (>10,000) key
>>> groups.
>>>
>>> We initially saw this problem using Scio join
>>> <https://github.com/spotify/scio/blob/be7166c58450bd59ae5d0048d39ef3ea0d5ed107/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala#L84-L112>
>>>  operations,
>>> but I was able to reproduce using public datasets & Beam PTransforms
>>> directly (job code link
>>> <https://github.com/clairemcginty/beam-test/blob/main/src/main/scala/clairem/data/JoinTest.scala#L32>;
>>> the repo README includes instructions to run it if anyone is interested).
>>> This job throws the following error when run in Dataflow with Beam 2.36.0:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Unable to encode element
>>> '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb,
>>> org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]'
>>> with coder
>>> 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'.
>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
>>>
>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
>>>
>>> org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
>>>
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
>>>
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
>>>
>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>>>
>>> org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192)
>>> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a
>>> null String
>>> org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74)
>>>
>>> To troubleshoot, I first tried downgrading to Beam 2.35.0; the job then
>>> ran successfully. Next, I tried running the job using Beam 2.36.0, but with
>>> 2.35.0's version of CoGbkResult
>>> <https://github.com/apache/beam/blob/v2.35.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java>
>>>  on
>>> the classpath; it ran successfully then, too. I'm wondering if it's related
>>> to the lazy-iterator improvements introduced in BEAM-13541
>>> <https://github.com/apache/beam/pull/16354>? I wasn't able to reproduce
>>> it with a unit test yet, unfortunately; maybe it's unique to the way
>>> Dataflow lazily-loads large CoGbkResults.
>>>
>>> I'm continuing to try to reproduce this on a smaller scale, but wanted
>>> to flag it here for now; it's preventing our pipeline fleet from upgrading
>>> to Beam 2.36.0. Any insight would be appreciated!
>>>
>>> Thanks,
>>> Claire
>>>
>>>
>>>
>>>
>>>

Reply via email to