Sorry this got lost. I filed https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an easy fix.
On Wed, Jan 9, 2019 at 8:33 PM Allie Chen <yifangc...@google.com> wrote: > > Greetings! > > May I ask whether there is any plan to work on this issue? Or if I just use > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any > performance issues/caveats I should worry about? > > Thanks! > Allie > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <eh...@google.com> wrote: >> >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more >> guidance on where to look. >> >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote: >>> >>> Created https://issues.apache.org/jira/browse/BEAM-5927 >>> >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>> Udi, do you know if we have a bug tracking this issue? >>>> >>>> If not, can you file one referencing this e-mail thread? >>>> >>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote: >>>>> >>>>> Thanks Udi. I agree, since it works fine removing either the side input >>>>> or the last flatten and combine operation. >>>>> >>>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: >>>>>> >>>>>> This looks like a FnApiRunner bug. >>>>>> When I override use_fnapi_runner = False in direct_runner.py the >>>>>> pipeline works. >>>>>> >>>>>> It seems like either the side-input to _copy_number or the Flatten >>>>>> operation is the culprit. >>>>>> >>>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a project that started failing with DirectRunner, but works well >>>>>>> using DataflowRunner (last working version is 2.4). The error message I >>>>>>> received are: >>>>>>> line 1088, in run_stage >>>>>>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>>>>>> >>>>>>> I have simplified the pipeline to the following example. Can someone >>>>>>> please take a look? Many thanks! >>>>>>> >>>>>>> Allie >>>>>>> >>>>>>> >>>>>>> import apache_beam as beam >>>>>>> import argparse >>>>>>> from apache_beam import transforms >>>>>>> from apache_beam import pvalue >>>>>>> from apache_beam.options import pipeline_options >>>>>>> >>>>>>> >>>>>>> def _copy_number(number, side=None): >>>>>>> yield number >>>>>>> >>>>>>> >>>>>>> def fn_sum(values): >>>>>>> return sum(values) >>>>>>> >>>>>>> >>>>>>> def run(argv=None): >>>>>>> parser = argparse.ArgumentParser() >>>>>>> _, pipeline_args = parser.parse_known_args(argv) >>>>>>> options = pipeline_options.PipelineOptions(pipeline_args) >>>>>>> numbers = [1, 2] >>>>>>> with beam.Pipeline(options=options) as p: >>>>>>> sum_1 = (p >>>>>>> | 'ReadNumber1' >> transforms.Create(numbers) >>>>>>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>>>>>> >>>>>>> sum_2 = (p >>>>>>> | 'ReadNumber2' >> transforms.Create(numbers) >>>>>>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>>>>>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>>>>>> >>>>>>> _ = ((sum_1, sum_2) >>>>>>> | beam.Flatten() >>>>>>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>>>>>> | beam.io.WriteToText('gs://BUCKET/sum')) >>>>>>> >>>>>>> >>>>>>>