[ https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094084#comment-17094084 ]
Pablo Estrada commented on BEAM-9832: ------------------------------------- I am not 100% sure, but this may be a fix: [https://github.com/apache/beam/pull/11546] : P > KeyError: 'No such coder: ' in fn_runner_test > --------------------------------------------- > > Key: BEAM-9832 > URL: https://issues.apache.org/jira/browse/BEAM-9832 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures > Reporter: Ning Kang > Assignee: Pablo Estrada > Priority: Critical > Time Spent: 10m > Remaining Estimate: 0h > > Failed test results can be found > [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]] > > A stack trace: > {code:java} > self = > <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithMultiWorkers > testMethod=test_read> > def test_read(self): > # Can't use NamedTemporaryFile as a context > # due to https://bugs.python.org/issue14243 > temp_file = tempfile.NamedTemporaryFile(delete=False) > try: > temp_file.write(b'a\nb\nc') > temp_file.close() > with self.create_pipeline() as p: > assert_that( > > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', > > 'c'])) > apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/pipeline.py:529: in __exit__ > self.run().wait_until_finish() > apache_beam/pipeline.py:502: in run > self._options).run(False) > apache_beam/pipeline.py:515: in run > return self.runner.run_pipeline(self, self._options) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in > run_pipeline > pipeline.to_runner_api(default_environment=self._default_environment)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in > run_via_runner_api > return self.run_stages(stage_context, stages) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages > bundle_context_manager, > apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage > bundle_manager) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle > data_input, data_output, input_timers, expected_timer_output) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in > process_bundle > timer_inputs)): > /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator > yield future.result() > /usr/lib/python3.5/concurrent/futures/_base.py:405: in result > return self.__get_result() > /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result > raise self._exception > apache_beam/utils/thread_pool_executor.py:44: in run > self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute > dry_run) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in > process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push > response = self.worker.do_instruction(request) > apache_beam/runners/worker/sdk_worker.py:471: in do_instruction > getattr(request, request_type), request.instruction_id) > apache_beam/runners/worker/sdk_worker.py:500: in process_bundle > instruction_id, request.process_bundle_descriptor_id) > apache_beam/runners/worker/sdk_worker.py:374: in get > self.data_channel_factory) > apache_beam/runners/worker/bundle_processor.py:782: in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True) > apache_beam/runners/worker/bundle_processor.py:836: in <listcomp> > (transform_id, get_operation(transform_id)) for transform_id in sorted( > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) > apache_beam/runners/worker/bundle_processor.py:820: in get_operation > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:820: in <dictcomp> > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:818: in <listcomp> > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) > apache_beam/runners/worker/bundle_processor.py:823: in get_operation > transform_id, transform_consumers) > apache_beam/runners/worker/bundle_processor.py:1108: in create_operation > return creator(self, transform_id, transform_proto, payload, consumers) > apache_beam/runners/worker/bundle_processor.py:1341: in > create_pair_with_restriction > return _create_sdf_operation(PairWithRestriction, *args) > apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation > parameter) > apache_beam/runners/worker/bundle_processor.py:1501: in > _create_pardo_operation > output_coders = factory.get_output_coders(transform_proto) > apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders > pcoll_id in transform_proto.outputs.items() > apache_beam/runners/worker/bundle_processor.py:1154: in <dictcomp> > pcoll_id in transform_proto.outputs.items() > apache_beam/runners/worker/bundle_processor.py:1139: in get_windowed_coder > coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > self = <apache_beam.runners.worker.bundle_processor.BeamTransformFactory > object at 0x7f4248bf5160> > coder_id = '' > def get_coder(self, coder_id): > # type: (str) -> coders.Coder > if coder_id not in self.descriptor.coders: > > raise KeyError("No such coder: %s" % coder_id) > E KeyError: 'No such coder: ' > apache_beam/runners/worker/bundle_processor.py:1128: KeyError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)