[ https://issues.apache.org/jira/browse/BEAM-10708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263715#comment-17263715 ]
Ning Kang commented on BEAM-10708: ---------------------------------- As of today (2021-01-12), when a pipeline including a SqlTransform is executed with InteractiveRunner(), an example failure stack trace can be found as following: {code:python} --------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-9-ae3e3b2a5b98> in <module> ----> 1 ib.show(pcoll) ~/beam/sdks/python/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs) 226 def run_within_progress_indicator(*args, **kwargs): 227 with ProgressIndicator('Processing...', 'Done.'): --> 228 return func(*args, **kwargs) 229 230 return run_within_progress_indicator ~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs) 484 recording_manager = ie.current_env().get_recording_manager( 485 user_pipeline, create_if_absent=True) --> 486 recording = recording_manager.record(pcolls, max_n=n, max_duration=duration) 487 488 # Catch a KeyboardInterrupt to gracefully cancel the recording and ~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration) 420 # arbitrary variables. 421 self._watch(pcolls) --> 422 pipeline_instrument = pi.PipelineInstrument(self.user_pipeline) 423 self.record_pipeline() 424 ~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in __init__(self, pipeline, options) 113 # proto is stable. The snapshot of pipeline will not be mutated within this 114 # module and can be used to recover original pipeline if needed. --> 115 self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( 116 pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options) 117 ie.current_env().add_derived_pipeline(self._pipeline, self._pipeline_snap) ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context) 900 if proto.root_transform_ids: 901 root_transform_id, = proto.root_transform_ids --> 902 p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] 903 else: 904 p.transforms_stack = [AppliedPTransform(None, None, '', None)] ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1250 result.parts = [] 1251 for transform_id in proto.subtransforms: -> 1252 part = context.transforms.get_by_id(transform_id) 1253 part.parent = result 1254 result.parts.append(part) ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1250 result.parts = [] 1251 for transform_id in proto.subtransforms: -> 1252 part = context.transforms.get_by_id(transform_id) 1253 part.parent = result 1254 result.parts.append(part) ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1250 result.parts = [] 1251 for transform_id in proto.subtransforms: -> 1252 part = context.transforms.get_by_id(transform_id) 1253 part.parent = result 1254 result.parts.append(part) ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1250 result.parts = [] 1251 for transform_id in proto.subtransforms: -> 1252 part = context.transforms.get_by_id(transform_id) 1253 part.parent = result 1254 result.parts.append(part) ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1250 result.parts = [] 1251 for transform_id in proto.subtransforms: -> 1252 part = context.transforms.get_by_id(transform_id) 1253 part.parent = result 1254 result.parts.append(part) ~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj: --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context) 1227 ] 1228 -> 1229 transform = ptransform.PTransform.from_runner_api(proto, context) 1230 # Ordering is important here. 1231 # TODO(BEAM-9635): use key, value pairs instead of depending on tags with ~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context) 728 parameter_type, constructor = cls._known_urns[proto.spec.urn] 729 --> 730 return constructor( 731 proto, 732 proto_utils.parse_Bytes(proto.spec.payload, parameter_type), ~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api_parameter(unused_ptransform, pardo_payload, context) 1417 def from_runner_api_parameter(unused_ptransform, pardo_payload, context): 1418 fn, args, kwargs, si_tags_and_types, windowing = pickler.loads( -> 1419 DoFnInfo.from_runner_api( 1420 pardo_payload.do_fn, context).serialized_dofn_data()) 1421 if si_tags_and_types: ~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, unused_context) 1491 return StatelessDoFnInfo(spec.urn) 1492 else: -> 1493 raise ValueError('Unexpected DoFn type: %s' % spec.urn) 1494 1495 @staticmethod ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1 {code} > InteractiveRunner cannot execute pipeline with cross-language transform > ----------------------------------------------------------------------- > > Key: BEAM-10708 > URL: https://issues.apache.org/jira/browse/BEAM-10708 > Project: Beam > Issue Type: Bug > Components: cross-language > Reporter: Brian Hulette > Assignee: Ning Kang > Priority: P2 > > The InteractiveRunner crashes when given a pipeline that includes a > cross-language transform. > Here's the example I tried to run in a jupyter notebook: > {code:python} > p = beam.Pipeline(InteractiveRunner()) > pc = (p | SqlTransform("""SELECT > CAST(1 AS INT) AS `id`, > CAST('foo' AS VARCHAR) AS `str`, > CAST(3.14 AS DOUBLE) AS `flt`""")) > df = interactive_beam.collect(pc) > {code} > The problem occurs when > [pipeline_fragment.py|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L66] > creates a copy of the pipeline by [writing it to proto and reading it > back|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L120]. > Reading it back fails because some of the pipeline is not written in Python. -- This message was sent by Atlassian Jira (v8.3.4#803005)