Created https://issues.apache.org/jira/browse/BEAM-5927
On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote: > Udi, do you know if we have a bug tracking this issue? > > If not, can you file one referencing this e-mail thread? > > On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote: > >> Thanks Udi. I agree, since it works fine removing either the side input >> or the last flatten and combine operation. >> >> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: >> >>> This looks like a FnApiRunner bug. >>> When I override use_fnapi_runner = False in direct_runner.py the >>> pipeline works. >>> >>> It seems like either the side-input to _copy_number or the Flatten >>> operation is the culprit. >>> >>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have a project that started failing with DirectRunner, but works well >>>> using DataflowRunner (last working version is 2.4). The error message I >>>> received are: >>>> line 1088, in run_stage >>>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>>> >>>> I have simplified the pipeline to the following example. Can someone >>>> please take a look? Many thanks! >>>> >>>> Allie >>>> >>>> >>>> import apache_beam as beam >>>> import argparse >>>> from apache_beam import transforms >>>> from apache_beam import pvalue >>>> from apache_beam.options import pipeline_options >>>> >>>> >>>> def _copy_number(number, side=None): >>>> yield number >>>> >>>> >>>> def fn_sum(values): >>>> return sum(values) >>>> >>>> >>>> def run(argv=None): >>>> parser = argparse.ArgumentParser() >>>> _, pipeline_args = parser.parse_known_args(argv) >>>> options = pipeline_options.PipelineOptions(pipeline_args) >>>> numbers = [1, 2] >>>> with beam.Pipeline(options=options) as p: >>>> sum_1 = (p >>>> | 'ReadNumber1' >> transforms.Create(numbers) >>>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>>> >>>> sum_2 = (p >>>> | 'ReadNumber2' >> transforms.Create(numbers) >>>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>>> >>>> _ = ((sum_1, sum_2) >>>> | beam.Flatten() >>>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>>> | beam.io.WriteToText('gs://BUCKET/sum')) >>>> >>>> >>>> >>>>
smime.p7s
Description: S/MIME Cryptographic Signature