> If any inputs are a sequence that has pcollections, we'll raise an error now, but we'll still allow non-pcollection iterables.
+1 to this approach. I think we will break (arguably) real pipelines with the original change - I've seen people do things like that before on purpose. `(pcoll1, (pcoll2, pcoll3)) | Flatten()` seems like it is probably always a bug. On Fri, Aug 15, 2025 at 9:41 AM Joey Tran <[email protected]> wrote: > The broken tests were how I discovered this was a backwards incompatible > change. > > I'm updating the PR now to instead just spot check the Flatten inputs. If > any inputs are a sequence that has pcollections, we'll raise an error now, > but we'll still allow non-pcollection iterables. > > This means the example I posted in the OP will fail at pipeline expansion > time but: > ``` > ([1, 2], pcoll_A) | Flatten() > ``` > will still work > > On Fri, Aug 15, 2025 at 9:03 AM XQ Hu via dev <[email protected]> wrote: > >> This already broke some tests. I think we should just make typehinting >> errors better and should not change this behavior. >> >> On Thu, Aug 14, 2025 at 7:33 PM Joey Tran <[email protected]> >> wrote: >> >>> Hey all, >>> >>> I just helped a user debug an issue that arose from them accidentally >>> passing a DoOutputsTuple into `Flatten`, e.g. something like >>> ``` >>> inputs_A = p | beam.Create([1,2,3]) >>> inputs_B = inputs_A | ParDo(MyDoFn()).with_outputs("even") # this is >>> actually a tuple >>> (inputs_A, inputs_B) | Flatten() >>> ``` >>> >>> This resulted in really confusing typehinting errors about pcollections >>> of pcollections. I've also seen this result in confusion in another place >>> where someone ended up with some kind of unpicklablepcollection elements in >>> a pcollection. >>> >>> I put up a PR[1] to validate the inputs to Flatten but I realize now >>> it's a backwards incompatible change. Does anyone have any comments or >>> objections to this change? I didn't even realize you could apply transforms >>> to raw iterables before this (e.g. `[1, 2, 3] | beam.Map(lambda x:x+1)`) >>> >>> [1] https://github.com/apache/beam/pull/35874 >>> >>
