Sorry this got lost. I filed
https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an
easy fix.

On Wed, Jan 9, 2019 at 8:33 PM Allie Chen <yifangc...@google.com> wrote:
>
> Greetings!
>
> May I ask whether there is any plan to work on this issue? Or if I just use 
> `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any 
> performance issues/caveats I should worry about?
>
> Thanks!
> Allie
>
> On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <eh...@google.com> wrote:
>>
>> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more 
>> guidance on where to look.
>>
>> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote:
>>>
>>> Created https://issues.apache.org/jira/browse/BEAM-5927
>>>
>>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>> Udi, do you know if we have a bug tracking this issue?
>>>>
>>>> If not, can you file one referencing this e-mail thread?
>>>>
>>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote:
>>>>>
>>>>> Thanks Udi. I agree, since it works fine removing either the side input 
>>>>> or the last flatten and combine operation.
>>>>>
>>>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote:
>>>>>>
>>>>>> This looks like a FnApiRunner bug.
>>>>>> When I override use_fnapi_runner = False in direct_runner.py the 
>>>>>> pipeline works.
>>>>>>
>>>>>> It seems like either the side-input to _copy_number or the Flatten 
>>>>>> operation is the culprit.
>>>>>>
>>>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a project that started failing with DirectRunner, but works well 
>>>>>>> using DataflowRunner (last working version is 2.4). The error message I 
>>>>>>> received are:
>>>>>>> line 1088, in run_stage
>>>>>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>>>>>
>>>>>>> I have simplified the pipeline to the following example. Can someone 
>>>>>>> please take a look? Many thanks!
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> import argparse
>>>>>>> from apache_beam import transforms
>>>>>>> from apache_beam import pvalue
>>>>>>> from apache_beam.options import pipeline_options
>>>>>>>
>>>>>>>
>>>>>>> def _copy_number(number, side=None):
>>>>>>>   yield number
>>>>>>>
>>>>>>>
>>>>>>> def fn_sum(values):
>>>>>>>   return sum(values)
>>>>>>>
>>>>>>>
>>>>>>> def run(argv=None):
>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>>>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>>>>>   numbers = [1, 2]
>>>>>>>   with beam.Pipeline(options=options) as p:
>>>>>>>     sum_1 = (p
>>>>>>>              | 'ReadNumber1' >> transforms.Create(numbers)
>>>>>>>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>>>>>
>>>>>>>     sum_2 = (p
>>>>>>>              | 'ReadNumber2' >> transforms.Create(numbers)
>>>>>>>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>>>>>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>>>>>
>>>>>>>     _ = ((sum_1, sum_2)
>>>>>>>          | beam.Flatten()
>>>>>>>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>>>>>          | beam.io.WriteToText('gs://BUCKET/sum'))
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to