Hello,

I'm playing with the interactive runner on a notebook and the flink runner
is used as the underlying runner. I wonder if it can read messages from
Kafka. I checked the example notebook
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb>
and
it works. However I cannot read Kafka messages with the following error.

 KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'

Cheers,
Jaehyeon

*Here is the source.*

pipeline_opts = {
    "job_name": "kafka-io",
    "environment_type": "LOOPBACK",
    "streaming": True,
    "parallelism": 3,
    "experiments": [
        "use_deprecated_read"
    ],  ## https://github.com/apache/beam/issues/20979
    "checkpointing_interval": "60000",
}
options = PipelineOptions([], **pipeline_opts)
# Required, else it will complain that when importing worker functions
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(
    
interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
options=options
)
events = (
    p
    | "Read from Kafka"
    >> kafka.ReadFromKafka(
        consumer_config={
            "bootstrap.servers": os.getenv(
                "BOOTSTRAP_SERVERS",
                "host.docker.internal:29092",
            ),
            "auto.offset.reset": "earliest",
            # "enable.auto.commit": "true",
            "group.id": "kafka-io",
        },
        topics=["website-visit"],
    )
    | "Decode messages" >> beam.Map(decode_message)
    | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
)
results = p.run()
result.wait_until_finish()

*And here is the full error message.*

WARNING:apache_beam.options.pipeline_options:Discarding invalid
overrides: {'checkpointing_interval': '60000'}

---------------------------------------------------------------------------KeyError
                                 Traceback (most recent call last)
Cell In[17], line 36     15 p = beam.Pipeline(     16
interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
options=options     17 )     18 events = (     19     p     20     |
"Read from Kafka"   (...)     34     | "Parse elements" >>
beam.Map(parse_json).with_output_types(EventLog)     35 )---> 36
results = p.run()     37 result.wait_until_finish()     38 #
ib.options.recording_duration = "120s"     39 # ib.show(events)

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>,
in Pipeline.run(self, test_runner_api)    584     finally:    585
 shutil.rmtree(tmpdir)--> 586   return self.runner.run_pipeline(self,
self._options)    587 finally:    588   if not is_in_ipython():

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>,
in InteractiveRunner.run_pipeline(self, pipeline, options)    145 if
isinstance(self._underlying_runner, FlinkRunner):    146
self.configure_for_flink(user_pipeline, options)--> 148
pipeline_instrument = inst.build_pipeline_instrument(pipeline,
options)    150 # The user_pipeline analyzed might be None if the
pipeline given has nothing    151 # to be cached and tracing back to
the user defined pipeline is impossible.    152 # When it's None,
there is no need to cache including the background    153 # caching
job and no result to track since no background caching job is    154 #
started at all.    155 if user_pipeline:    156   # Should use the
underlying runner and run asynchronously.

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>,
in build_pipeline_instrument(pipeline, options)    742 def
build_pipeline_instrument(pipeline, options=None):    743   """Creates
PipelineInstrument for a pipeline and its options with cache.    744
  745   Throughout the process, the returned PipelineInstrument
snapshots the given   (...)    754   runner pipeline to apply
interactivity.    755   """--> 756   pi = PipelineInstrument(pipeline,
options)    757   pi.preprocess()    758   pi.instrument()  #
Instruments the pipeline only once.

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>,
in PipelineInstrument.__init__(self, pipeline, options)     67 if
background_caching_job.has_source_to_cache(self._user_pipeline):
68   self._cache_manager = ie.current_env().get_cache_manager(     69
     self._user_pipeline)---> 71 self._background_caching_pipeline =
beam.pipeline.Pipeline.from_runner_api(     72
pipeline.to_runner_api(), pipeline.runner, options)     73
ie.current_env().add_derived_pipeline(     74     self._pipeline,
self._background_caching_pipeline)     76 # Snapshot of original
pipeline information.

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>,
in Pipeline.from_runner_api(proto, runner, options, return_context)
1018 if proto.root_transform_ids:   1019   root_transform_id, =
proto.root_transform_ids-> 1020   p.transforms_stack =
[context.transforms.get_by_id(root_transform_id)]   1021 else:   1022
 p.transforms_stack = [AppliedPTransform(None, None, '', None)]

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self,
id):    112   # type: (str) -> PortableObjectT    113   if id not in
self._id_to_obj:--> 114     self._id_to_obj[id] =
self._obj_type.from_runner_api(    115         self._id_to_proto[id],
self._pipeline_context)    116   return self._id_to_obj[id]

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
in AppliedPTransform.from_runner_api(proto, context)   1454
result.parts = []   1455 for transform_id in proto.subtransforms:->
1456   part = context.transforms.get_by_id(transform_id)   1457
part.parent = result   1458   result.add_part(part)

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self,
id):    112   # type: (str) -> PortableObjectT    113   if id not in
self._id_to_obj:--> 114     self._id_to_obj[id] =
self._obj_type.from_runner_api(    115         self._id_to_proto[id],
self._pipeline_context)    116   return self._id_to_obj[id]

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
in AppliedPTransform.from_runner_api(proto, context)   1454
result.parts = []   1455 for transform_id in proto.subtransforms:->
1456   part = context.transforms.get_by_id(transform_id)   1457
part.parent = result   1458   result.add_part(part)

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self,
id):    112   # type: (str) -> PortableObjectT    113   if id not in
self._id_to_obj:--> 114     self._id_to_obj[id] =
self._obj_type.from_runner_api(    115         self._id_to_proto[id],
self._pipeline_context)    116   return self._id_to_obj[id]

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>,
in AppliedPTransform.from_runner_api(proto, context)   1419
side_input_tags = []   1421 main_inputs = {   1422     tag:
context.pcollections.get_by_id(id)   1423     for (tag, id) in
proto.inputs.items() if tag not in side_input_tags   1424 }-> 1426
transform = ptransform.PTransform.from_runner_api(proto, context)
1427 if transform and proto.environment_id:   1428   resource_hints =
context.environments.get_by_id(   1429
proto.environment_id).resource_hints()

File 
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>,
in PTransform.from_runner_api(cls, proto, context)    767 if proto is
None or proto.spec is None or not proto.spec.urn:    768   return
None--> 769 parameter_type, constructor =
cls._known_urns[proto.spec.urn]    771 return constructor(    772
proto,    773     proto_utils.parse_Bytes(proto.spec.payload,
parameter_type),    774     context)
KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'

Reply via email to