Hello,
I'm playing with the interactive runner on a notebook and the flink runner
is used as the underlying runner. I wonder if it can read messages from
Kafka. I checked the example notebook
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb>
and
it works. However I cannot read Kafka messages with the following error.
KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
Cheers,
Jaehyeon
*Here is the source.*
pipeline_opts = {
"job_name": "kafka-io",
"environment_type": "LOOPBACK",
"streaming": True,
"parallelism": 3,
"experiments": [
"use_deprecated_read"
], ## https://github.com/apache/beam/issues/20979
"checkpointing_interval": "60000",
}
options = PipelineOptions([], **pipeline_opts)
# Required, else it will complain that when importing worker functions
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(
interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
options=options
)
events = (
p
| "Read from Kafka"
>> kafka.ReadFromKafka(
consumer_config={
"bootstrap.servers": os.getenv(
"BOOTSTRAP_SERVERS",
"host.docker.internal:29092",
),
"auto.offset.reset": "earliest",
# "enable.auto.commit": "true",
"group.id": "kafka-io",
},
topics=["website-visit"],
)
| "Decode messages" >> beam.Map(decode_message)
| "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
)
results = p.run()
result.wait_until_finish()
*And here is the full error message.*
WARNING:apache_beam.options.pipeline_options:Discarding invalid
overrides: {'checkpointing_interval': '60000'}
---------------------------------------------------------------------------KeyError
Traceback (most recent call last)
Cell In[17], line 36 15 p = beam.Pipeline( 16
interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
options=options 17 ) 18 events = ( 19 p 20 |
"Read from Kafka" (...) 34 | "Parse elements" >>
beam.Map(parse_json).with_output_types(EventLog) 35 )---> 36
results = p.run() 37 result.wait_until_finish() 38 #
ib.options.recording_duration = "120s" 39 # ib.show(events)
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>,
in Pipeline.run(self, test_runner_api) 584 finally: 585
shutil.rmtree(tmpdir)--> 586 return self.runner.run_pipeline(self,
self._options) 587 finally: 588 if not is_in_ipython():
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>,
in InteractiveRunner.run_pipeline(self, pipeline, options) 145 if
isinstance(self._underlying_runner, FlinkRunner): 146
self.configure_for_flink(user_pipeline, options)--> 148
pipeline_instrument = inst.build_pipeline_instrument(pipeline,
options) 150 # The user_pipeline analyzed might be None if the
pipeline given has nothing 151 # to be cached and tracing back to
the user defined pipeline is impossible. 152 # When it's None,
there is no need to cache including the background 153 # caching
job and no result to track since no background caching job is 154 #
started at all. 155 if user_pipeline: 156 # Should use the
underlying runner and run asynchronously.
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>,
in build_pipeline_instrument(pipeline, options) 742 def
build_pipeline_instrument(pipeline, options=None): 743 """Creates
PipelineInstrument for a pipeline and its options with cache. 744
745 Throughout the process, the returned PipelineInstrument
snapshots the given (...) 754 runner pipeline to apply
interactivity. 755 """--> 756 pi = PipelineInstrument(pipeline,
options) 757 pi.preprocess() 758 pi.instrument() #
Instruments the pipeline only once.
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>,
in PipelineInstrument.__init__(self, pipeline, options) 67 if
background_caching_job.has_source_to_cache(self._user_pipeline):
68 self._cache_manager = ie.current_env().get_cache_manager( 69
self._user_pipeline)---> 71 self._background_caching_pipeline =
beam.pipeline.Pipeline.from_runner_api( 72
pipeline.to_runner_api(), pipeline.runner, options) 73
ie.current_env().add_derived_pipeline( 74 self._pipeline,
self._background_caching_pipeline) 76 # Snapshot of original
pipeline information.
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>,
in Pipeline.from_runner_api(proto, runner, options, return_context)
1018 if proto.root_transform_ids: 1019 root_transform_id, =
proto.root_transform_ids-> 1020 p.transforms_stack =
[context.transforms.get_by_id(root_transform_id)] 1021 else: 1022
p.transforms_stack = [AppliedPTransform(None, None, '', None)]
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self,
id): 112 # type: (str) -> PortableObjectT 113 if id not in
self._id_to_obj:--> 114 self._id_to_obj[id] =
self._obj_type.from_runner_api( 115 self._id_to_proto[id],
self._pipeline_context) 116 return self._id_to_obj[id]
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
in AppliedPTransform.from_runner_api(proto, context) 1454
result.parts = [] 1455 for transform_id in proto.subtransforms:->
1456 part = context.transforms.get_by_id(transform_id) 1457
part.parent = result 1458 result.add_part(part)
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self,
id): 112 # type: (str) -> PortableObjectT 113 if id not in
self._id_to_obj:--> 114 self._id_to_obj[id] =
self._obj_type.from_runner_api( 115 self._id_to_proto[id],
self._pipeline_context) 116 return self._id_to_obj[id]
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
in AppliedPTransform.from_runner_api(proto, context) 1454
result.parts = [] 1455 for transform_id in proto.subtransforms:->
1456 part = context.transforms.get_by_id(transform_id) 1457
part.parent = result 1458 result.add_part(part)
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self,
id): 112 # type: (str) -> PortableObjectT 113 if id not in
self._id_to_obj:--> 114 self._id_to_obj[id] =
self._obj_type.from_runner_api( 115 self._id_to_proto[id],
self._pipeline_context) 116 return self._id_to_obj[id]
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>,
in AppliedPTransform.from_runner_api(proto, context) 1419
side_input_tags = [] 1421 main_inputs = { 1422 tag:
context.pcollections.get_by_id(id) 1423 for (tag, id) in
proto.inputs.items() if tag not in side_input_tags 1424 }-> 1426
transform = ptransform.PTransform.from_runner_api(proto, context)
1427 if transform and proto.environment_id: 1428 resource_hints =
context.environments.get_by_id( 1429
proto.environment_id).resource_hints()
File
~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769
<http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>,
in PTransform.from_runner_api(cls, proto, context) 767 if proto is
None or proto.spec is None or not proto.spec.urn: 768 return
None--> 769 parameter_type, constructor =
cls._known_urns[proto.spec.urn] 771 return constructor( 772
proto, 773 proto_utils.parse_Bytes(proto.spec.payload,
parameter_type), 774 context)
KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'