[ https://issues.apache.org/jira/browse/BEAM-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Bradshaw resolved BEAM-5927. ----------------------------------- Resolution: Duplicate Fix Version/s: Not applicable > FnApiRunner KeyError > -------------------- > > Key: BEAM-5927 > URL: https://issues.apache.org/jira/browse/BEAM-5927 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Udi Meiri > Assignee: Robert Bradshaw > Priority: Major > Fix For: Not applicable > > > Code to recreate (modified slightly from > https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E): > import apache_beam as beam > import argparse > from apache_beam import transforms > from apache_beam import pvalue > from apache_beam.options import pipeline_options > import logging > def _copy_number(number, side=None): > print '_copy_number:', number, side > yield number > def fn_sum(values): > #print 'values', values > return sum(values) > def run(argv=None): > parser = argparse.ArgumentParser() > _, pipeline_args = parser.parse_known_args(argv) > options = pipeline_options.PipelineOptions(pipeline_args) > #options.view_as(pipeline_options.StandardOptions).streaming = True > numbers = [1, 2] > with beam.Pipeline(options=options) as p: > sum_1 = (p > | 'ReadNumber1' >> transforms.Create(numbers) > | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > sum_2 = (p > | 'ReadNumber2' >> transforms.Create(numbers) > | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > _ = ((sum_1, sum_2) > | beam.Flatten() > | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > | beam.io.WriteToText('gs://foo/sum')) > logging.getLogger().setLevel(logging.INFO) > run() > Console: > $ python test.py > INFO:root:Missing pipeline option (runner). Executing pipeline using the > default runner: DirectRunner. > INFO:root:==================== <function annotate_downstream_side_inputs at > 0x7f2b7088e758> ==================== > INFO:root:==================== <function fix_side_input_pcoll_coders at > 0x7f2b7088e7d0> ==================== > INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0> > ==================== > INFO:root:==================== <function expand_gbk at 0x7f2b7088e668> > ==================== > INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0> > ==================== > INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848> > ==================== > INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578> > ==================== > INFO:root:==================== <function inject_timer_pcollections at > 0x7f2b7088e8c0> ==================== > INFO:root:==================== <function sort_stages at 0x7f2b7088e938> > ==================== > INFO:root:Running > ((ref_AppliedPTransform_ReadNumber1/Read_3)+((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)+(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write) > INFO:root:Running > ((CalculateSum1/CombinePerKey/Group/Read)+(CalculateSum1/CombinePerKey/Merge))+((CalculateSum1/CombinePerKey/ExtractOutputs)+((ref_AppliedPTransform_CalculateSum1/UnKey_13)+(ref_PCollection_PCollection_7/Write))) > INFO:root:Running > ((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)+(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)+(ref_PCollection_PCollection_9/Write))+(Flatten/Transcode/0)))+(Flatten/Write/0) > INFO:root:Running > ((ref_AppliedPTransform_ReadNumber2/Read_18)+((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)+((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)+(CalculateSum2/CombinePerKey/Precombine))))+(CalculateSum2/CombinePerKey/Group/Write) > Traceback (most recent call last): > File "test.py", line 41, in <module> > run() > File "test.py", line 38, in run > | beam.io.WriteToText('gs://foo/sum')) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 425, in __exit__ > self.run().wait_until_finish() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 405, in run > self._options).run(False) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 418, in run > return self.runner.run_pipeline(self) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", > line 139, in run_pipeline > return runner.run_pipeline(pipeline) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 238, in run_pipeline > return self.run_via_runner_api(pipeline.to_runner_api()) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 241, in run_via_runner_api > return self.run_stages(*self.create_stages(pipeline_proto)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1020, in run_stages > pcoll_buffers, safe_coders) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1096, in run_stage > pipeline_components.pcollections[actual_pcoll_id].coder_id]] > KeyError: u'coder_4' -- This message was sent by Atlassian JIRA (v7.6.3#76005)