Thanks Udi. I agree, since it works fine removing either the side input or the last flatten and combine operation.
On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: > This looks like a FnApiRunner bug. > When I override use_fnapi_runner = False in direct_runner.py the pipeline > works. > > It seems like either the side-input to _copy_number or the Flatten > operation is the culprit. > > On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote: > >> Hi, >> >> I have a project that started failing with DirectRunner, but works well >> using DataflowRunner (last working version is 2.4). The error message I >> received are: >> line 1088, in run_stage >> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >> KeyError: u'ref_Coder_WindowedValueCoder_1' >> >> I have simplified the pipeline to the following example. Can someone >> please take a look? Many thanks! >> >> Allie >> >> >> import apache_beam as beam >> import argparse >> from apache_beam import transforms >> from apache_beam import pvalue >> from apache_beam.options import pipeline_options >> >> >> def _copy_number(number, side=None): >> yield number >> >> >> def fn_sum(values): >> return sum(values) >> >> >> def run(argv=None): >> parser = argparse.ArgumentParser() >> _, pipeline_args = parser.parse_known_args(argv) >> options = pipeline_options.PipelineOptions(pipeline_args) >> numbers = [1, 2] >> with beam.Pipeline(options=options) as p: >> sum_1 = (p >> | 'ReadNumber1' >> transforms.Create(numbers) >> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >> >> sum_2 = (p >> | 'ReadNumber2' >> transforms.Create(numbers) >> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >> >> _ = ((sum_1, sum_2) >> | beam.Flatten() >> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >> | beam.io.WriteToText('gs://BUCKET/sum')) >> >> >> >>