> If any inputs are a sequence that has pcollections, we'll raise an error
now, but we'll still allow non-pcollection iterables.

+1 to this approach. I think we will break (arguably) real pipelines with
the original change - I've seen people do things like that before on
purpose.

`(pcoll1, (pcoll2, pcoll3)) | Flatten()`

seems like it is probably always a bug.

On Fri, Aug 15, 2025 at 9:41 AM Joey Tran <[email protected]> wrote:

> The broken tests were how I discovered this was a backwards incompatible
> change.
>
> I'm updating the PR now to instead just spot check the Flatten inputs. If
> any inputs are a sequence that has pcollections, we'll raise an error now,
> but we'll still allow non-pcollection iterables.
>
> This means the example I posted in the OP will fail at pipeline expansion
> time but:
> ```
> ([1, 2], pcoll_A) | Flatten()
> ```
> will still work
>
> On Fri, Aug 15, 2025 at 9:03 AM XQ Hu via dev <[email protected]> wrote:
>
>> This already broke some tests. I think we should just make typehinting
>> errors better and should not change this behavior.
>>
>> On Thu, Aug 14, 2025 at 7:33 PM Joey Tran <[email protected]>
>> wrote:
>>
>>> Hey all,
>>>
>>> I just helped a user debug an issue that arose from them accidentally
>>> passing a DoOutputsTuple into `Flatten`, e.g. something like
>>> ```
>>> inputs_A = p | beam.Create([1,2,3])
>>> inputs_B = inputs_A | ParDo(MyDoFn()).with_outputs("even") # this is
>>> actually a tuple
>>> (inputs_A, inputs_B) | Flatten()
>>> ```
>>>
>>> This resulted in really confusing typehinting errors about pcollections
>>> of pcollections. I've also seen this result in confusion in another place
>>> where someone ended up with some kind of unpicklablepcollection elements in
>>> a pcollection.
>>>
>>> I put up a PR[1] to validate the inputs to Flatten but I realize now
>>> it's a backwards incompatible change. Does anyone have any comments or
>>> objections to this change? I didn't even realize you could apply transforms
>>> to raw iterables before this (e.g. `[1, 2, 3] | beam.Map(lambda x:x+1)`)
>>>
>>> [1] https://github.com/apache/beam/pull/35874
>>>
>>

Reply via email to