Hi Beam devs!

I wanted to surface a possible regression with CoGroupByKeys in this
transform. Our organization (which primarily uses Scio) has had several
pipelines start failing after upgrading from Beam 2.35.0 to Beam
2.36.0, specifically during cogroup operations with large (>10,000) key
groups.

We initially saw this problem using Scio join
<https://github.com/spotify/scio/blob/be7166c58450bd59ae5d0048d39ef3ea0d5ed107/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala#L84-L112>
operations,
but I was able to reproduce using public datasets & Beam PTransforms
directly (job code link
<https://github.com/clairemcginty/beam-test/blob/main/src/main/scala/clairem/data/JoinTest.scala#L32>;
the repo README includes instructions to run it if anyone is interested).
This job throws the following error when run in Dataflow with Beam 2.36.0:

Caused by: java.lang.IllegalArgumentException: Unable to encode element
'[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb,
org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]' with
coder
'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'.
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null
String
org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74)

To troubleshoot, I first tried downgrading to Beam 2.35.0; the job then ran
successfully. Next, I tried running the job using Beam 2.36.0, but with
2.35.0's version of CoGbkResult
<https://github.com/apache/beam/blob/v2.35.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java>
on
the classpath; it ran successfully then, too. I'm wondering if it's related
to the lazy-iterator improvements introduced in BEAM-13541
<https://github.com/apache/beam/pull/16354>? I wasn't able to reproduce it
with a unit test yet, unfortunately; maybe it's unique to the way Dataflow
lazily-loads large CoGbkResults.

I'm continuing to try to reproduce this on a smaller scale, but wanted to
flag it here for now; it's preventing our pipeline fleet from upgrading to
Beam 2.36.0. Any insight would be appreciated!

Thanks,
Claire

Reply via email to