[ 
https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094084#comment-17094084
 ] 

Pablo Estrada commented on BEAM-9832:
-------------------------------------

I am not 100% sure, but this may be a fix: 
[https://github.com/apache/beam/pull/11546] : P

> KeyError: 'No such coder: ' in fn_runner_test
> ---------------------------------------------
>
>                 Key: BEAM-9832
>                 URL: https://issues.apache.org/jira/browse/BEAM-9832
>             Project: Beam
>          Issue Type: Test
>          Components: sdk-py-core, test-failures
>            Reporter: Ning Kang
>            Assignee: Pablo Estrada
>            Priority: Critical
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Failed test results can be found 
> [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]]
>  
> A stack trace:
> {code:java}
> self = 
> <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithMultiWorkers
>  testMethod=test_read>
>     def test_read(self):
>       # Can't use NamedTemporaryFile as a context
>       # due to https://bugs.python.org/issue14243
>       temp_file = tempfile.NamedTemporaryFile(delete=False)
>       try:
>         temp_file.write(b'a\nb\nc')
>         temp_file.close()
>         with self.create_pipeline() as p:
>           assert_that(
> >             p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 
> > 'c']))
> apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> apache_beam/pipeline.py:529: in __exit__
>     self.run().wait_until_finish()
> apache_beam/pipeline.py:502: in run
>     self._options).run(False)
> apache_beam/pipeline.py:515: in run
>     return self.runner.run_pipeline(self, self._options)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in 
> run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in 
> run_via_runner_api
>     return self.run_stages(stage_context, stages)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
>     bundle_context_manager,
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
>     bundle_manager)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in 
> process_bundle
>     timer_inputs)):
> /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
>     yield future.result()
> /usr/lib/python3.5/concurrent/futures/_base.py:405: in result
>     return self.__get_result()
> /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
>     raise self._exception
> apache_beam/utils/thread_pool_executor.py:44: in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
>     dry_run)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in 
> process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
> apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
>     response = self.worker.do_instruction(request)
> apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
>     getattr(request, request_type), request.instruction_id)
> apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
>     instruction_id, request.process_bundle_descriptor_id)
> apache_beam/runners/worker/sdk_worker.py:374: in get
>     self.data_channel_factory)
> apache_beam/runners/worker/bundle_processor.py:782: in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)
> apache_beam/runners/worker/bundle_processor.py:836: in <listcomp>
>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
>     result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:820: in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:820: in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:818: in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
>     result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:823: in get_operation
>     transform_id, transform_consumers)
> apache_beam/runners/worker/bundle_processor.py:1108: in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
> apache_beam/runners/worker/bundle_processor.py:1341: in 
> create_pair_with_restriction
>     return _create_sdf_operation(PairWithRestriction, *args)
> apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation
>     parameter)
> apache_beam/runners/worker/bundle_processor.py:1501: in 
> _create_pardo_operation
>     output_coders = factory.get_output_coders(transform_proto)
> apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders
>     pcoll_id in transform_proto.outputs.items()
> apache_beam/runners/worker/bundle_processor.py:1154: in <dictcomp>
>     pcoll_id in transform_proto.outputs.items()
> apache_beam/runners/worker/bundle_processor.py:1139: in get_windowed_coder
>     coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <apache_beam.runners.worker.bundle_processor.BeamTransformFactory 
> object at 0x7f4248bf5160>
> coder_id = ''
>     def get_coder(self, coder_id):
>       # type: (str) -> coders.Coder
>       if coder_id not in self.descriptor.coders:
> >       raise KeyError("No such coder: %s" % coder_id)
> E       KeyError: 'No such coder: '
> apache_beam/runners/worker/bundle_processor.py:1128: KeyError
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to