Hi Beam devs! I wanted to surface a possible regression with CoGroupByKeys in this transform. Our organization (which primarily uses Scio) has had several pipelines start failing after upgrading from Beam 2.35.0 to Beam 2.36.0, specifically during cogroup operations with large (>10,000) key groups.
We initially saw this problem using Scio join <https://github.com/spotify/scio/blob/be7166c58450bd59ae5d0048d39ef3ea0d5ed107/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala#L84-L112> operations, but I was able to reproduce using public datasets & Beam PTransforms directly (job code link <https://github.com/clairemcginty/beam-test/blob/main/src/main/scala/clairem/data/JoinTest.scala#L32>; the repo README includes instructions to run it if anyone is interested). This job throws the following error when run in Dataflow with Beam 2.36.0: Caused by: java.lang.IllegalArgumentException: Unable to encode element '[org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@33dd2cbb, org.apache.beam.sdk.transforms.join.CoGbkResult$TagIterable@26edfb82]' with coder 'org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@346927'. org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128) org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null String org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74) To troubleshoot, I first tried downgrading to Beam 2.35.0; the job then ran successfully. Next, I tried running the job using Beam 2.36.0, but with 2.35.0's version of CoGbkResult <https://github.com/apache/beam/blob/v2.35.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java> on the classpath; it ran successfully then, too. I'm wondering if it's related to the lazy-iterator improvements introduced in BEAM-13541 <https://github.com/apache/beam/pull/16354>? I wasn't able to reproduce it with a unit test yet, unfortunately; maybe it's unique to the way Dataflow lazily-loads large CoGbkResults. I'm continuing to try to reproduce this on a smaller scale, but wanted to flag it here for now; it's preventing our pipeline fleet from upgrading to Beam 2.36.0. Any insight would be appreciated! Thanks, Claire