[ 
https://issues.apache.org/jira/browse/BEAM-10708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263715#comment-17263715
 ] 

Ning Kang commented on BEAM-10708:
----------------------------------

As of today (2021-01-12), when a pipeline including a SqlTransform is executed 
with InteractiveRunner(), an example failure stack trace can be found as 
following:

{code:python}
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-ae3e3b2a5b98> in <module>
----> 1 ib.show(pcoll)

~/beam/sdks/python/apache_beam/runners/interactive/utils.py in 
run_within_progress_indicator(*args, **kwargs)
    226   def run_within_progress_indicator(*args, **kwargs):
    227     with ProgressIndicator('Processing...', 'Done.'):
--> 228       return func(*args, **kwargs)
    229 
    230   return run_within_progress_indicator

~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in 
show(*pcolls, **configs)
    484   recording_manager = ie.current_env().get_recording_manager(
    485       user_pipeline, create_if_absent=True)
--> 486   recording = recording_manager.record(pcolls, max_n=n, 
max_duration=duration)
    487 
    488   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in 
record(self, pcolls, max_n, max_duration)
    420     # arbitrary variables.
    421     self._watch(pcolls)
--> 422     pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
    423     self.record_pipeline()
    424 

~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in 
__init__(self, pipeline, options)
    113     # proto is stable. The snapshot of pipeline will not be mutated 
within this
    114     # module and can be used to recover original pipeline if needed.
--> 115     self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api(
    116         pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, 
options)
    117     ie.current_env().add_derived_pipeline(self._pipeline, 
self._pipeline_snap)

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, 
options, return_context)
    900     if proto.root_transform_ids:
    901       root_transform_id, = proto.root_transform_ids
--> 902       p.transforms_stack = 
[context.transforms.get_by_id(root_transform_id)]
    903     else:
    904       p.transforms_stack = [AppliedPTransform(None, None, '', None)]

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, 
id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1227     ]
   1228 
-> 1229     transform = ptransform.PTransform.from_runner_api(proto, context)
   1230     # Ordering is important here.
   1231     # TODO(BEAM-9635): use key, value pairs instead of depending on 
tags with

~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, 
proto, context)
    728     parameter_type, constructor = cls._known_urns[proto.spec.urn]
    729 
--> 730     return constructor(
    731         proto,
    732         proto_utils.parse_Bytes(proto.spec.payload, parameter_type),

~/beam/sdks/python/apache_beam/transforms/core.py in 
from_runner_api_parameter(unused_ptransform, pardo_payload, context)
   1417   def from_runner_api_parameter(unused_ptransform, pardo_payload, 
context):
   1418     fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-> 1419         DoFnInfo.from_runner_api(
   1420             pardo_payload.do_fn, context).serialized_dofn_data())
   1421     if si_tags_and_types:

~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, 
unused_context)
   1491       return StatelessDoFnInfo(spec.urn)
   1492     else:
-> 1493       raise ValueError('Unexpected DoFn type: %s' % spec.urn)
   1494 
   1495   @staticmethod

ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{code}


> InteractiveRunner cannot execute pipeline with cross-language transform
> -----------------------------------------------------------------------
>
>                 Key: BEAM-10708
>                 URL: https://issues.apache.org/jira/browse/BEAM-10708
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language
>            Reporter: Brian Hulette
>            Assignee: Ning Kang
>            Priority: P2
>
> The InteractiveRunner crashes when given a pipeline that includes a 
> cross-language transform.
> Here's the example I tried to run in a jupyter notebook:
> {code:python}
> p = beam.Pipeline(InteractiveRunner())
> pc = (p | SqlTransform("""SELECT
>             CAST(1 AS INT) AS `id`,
>             CAST('foo' AS VARCHAR) AS `str`,
>             CAST(3.14  AS DOUBLE) AS `flt`"""))
> df = interactive_beam.collect(pc)
> {code}
> The problem occurs when 
> [pipeline_fragment.py|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L66]
>  creates a copy of the pipeline by [writing it to proto and reading it 
> back|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L120].
>  Reading it back fails because some of the pipeline is not written in Python.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to