Udi, do you know if we have a bug tracking this issue?

If not, can you file one referencing this e-mail thread?

On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote:

> Thanks Udi. I agree, since it works fine removing either the side input or
> the last flatten and combine operation.
>
> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote:
>
>> This looks like a FnApiRunner bug.
>> When I override use_fnapi_runner = False in direct_runner.py the pipeline
>> works.
>>
>> It seems like either the side-input to _copy_number or the Flatten
>> operation is the culprit.
>>
>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote:
>>
>>> Hi,
>>>
>>> I have a project that started failing with DirectRunner, but works well
>>> using DataflowRunner (last working version is 2.4). The error message I
>>> received are:
>>> line 1088, in run_stage
>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>
>>> I have simplified the pipeline to the following example. Can someone
>>> please take a look? Many thanks!
>>>
>>> Allie
>>>
>>>
>>> import apache_beam as beam
>>> import argparse
>>> from apache_beam import transforms
>>> from apache_beam import pvalue
>>> from apache_beam.options import pipeline_options
>>>
>>>
>>> def _copy_number(number, side=None):
>>>   yield number
>>>
>>>
>>> def fn_sum(values):
>>>   return sum(values)
>>>
>>>
>>> def run(argv=None):
>>>   parser = argparse.ArgumentParser()
>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>   numbers = [1, 2]
>>>   with beam.Pipeline(options=options) as p:
>>>     sum_1 = (p
>>>              | 'ReadNumber1' >> transforms.Create(numbers)
>>>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>
>>>     sum_2 = (p
>>>              | 'ReadNumber2' >> transforms.Create(numbers)
>>>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>
>>>     _ = ((sum_1, sum_2)
>>>          | beam.Flatten()
>>>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>          | beam.io.WriteToText('gs://BUCKET/sum'))
>>>
>>>
>>>
>>>

Reply via email to