Hello, I'm playing with the interactive runner on a notebook and the flink runner is used as the underlying runner. I wonder if it can read messages from Kafka. I checked the example notebook <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb> and it works. However I cannot read Kafka messages with the following error.
KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2' Cheers, Jaehyeon *Here is the source.* pipeline_opts = { "job_name": "kafka-io", "environment_type": "LOOPBACK", "streaming": True, "parallelism": 3, "experiments": [ "use_deprecated_read" ], ## https://github.com/apache/beam/issues/20979 "checkpointing_interval": "60000", } options = PipelineOptions([], **pipeline_opts) # Required, else it will complain that when importing worker functions options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline( interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), options=options ) events = ( p | "Read from Kafka" >> kafka.ReadFromKafka( consumer_config={ "bootstrap.servers": os.getenv( "BOOTSTRAP_SERVERS", "host.docker.internal:29092", ), "auto.offset.reset": "earliest", # "enable.auto.commit": "true", "group.id": "kafka-io", }, topics=["website-visit"], ) | "Decode messages" >> beam.Map(decode_message) | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog) ) results = p.run() result.wait_until_finish() *And here is the full error message.* WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'checkpointing_interval': '60000'} ---------------------------------------------------------------------------KeyError Traceback (most recent call last) Cell In[17], line 36 15 p = beam.Pipeline( 16 interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), options=options 17 ) 18 events = ( 19 p 20 | "Read from Kafka" (...) 34 | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog) 35 )---> 36 results = p.run() 37 result.wait_until_finish() 38 # ib.options.recording_duration = "120s" 39 # ib.show(events) File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>, in Pipeline.run(self, test_runner_api) 584 finally: 585 shutil.rmtree(tmpdir)--> 586 return self.runner.run_pipeline(self, self._options) 587 finally: 588 if not is_in_ipython(): File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>, in InteractiveRunner.run_pipeline(self, pipeline, options) 145 if isinstance(self._underlying_runner, FlinkRunner): 146 self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) 150 # The user_pipeline analyzed might be None if the pipeline given has nothing 151 # to be cached and tracing back to the user defined pipeline is impossible. 152 # When it's None, there is no need to cache including the background 153 # caching job and no result to track since no background caching job is 154 # started at all. 155 if user_pipeline: 156 # Should use the underlying runner and run asynchronously. File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>, in build_pipeline_instrument(pipeline, options) 742 def build_pipeline_instrument(pipeline, options=None): 743 """Creates PipelineInstrument for a pipeline and its options with cache. 744 745 Throughout the process, the returned PipelineInstrument snapshots the given (...) 754 runner pipeline to apply interactivity. 755 """--> 756 pi = PipelineInstrument(pipeline, options) 757 pi.preprocess() 758 pi.instrument() # Instruments the pipeline only once. File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>, in PipelineInstrument.__init__(self, pipeline, options) 67 if background_caching_job.has_source_to_cache(self._user_pipeline): 68 self._cache_manager = ie.current_env().get_cache_manager( 69 self._user_pipeline)---> 71 self._background_caching_pipeline = beam.pipeline.Pipeline.from_runner_api( 72 pipeline.to_runner_api(), pipeline.runner, options) 73 ie.current_env().add_derived_pipeline( 74 self._pipeline, self._background_caching_pipeline) 76 # Snapshot of original pipeline information. File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>, in Pipeline.from_runner_api(proto, runner, options, return_context) 1018 if proto.root_transform_ids: 1019 root_transform_id, = proto.root_transform_ids-> 1020 p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] 1021 else: 1022 p.transforms_stack = [AppliedPTransform(None, None, '', None)] File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): 112 # type: (str) -> PortableObjectT 113 if id not in self._id_to_obj:--> 114 self._id_to_obj[id] = self._obj_type.from_runner_api( 115 self._id_to_proto[id], self._pipeline_context) 116 return self._id_to_obj[id] File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = [] 1455 for transform_id in proto.subtransforms:-> 1456 part = context.transforms.get_by_id(transform_id) 1457 part.parent = result 1458 result.add_part(part) File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): 112 # type: (str) -> PortableObjectT 113 if id not in self._id_to_obj:--> 114 self._id_to_obj[id] = self._obj_type.from_runner_api( 115 self._id_to_proto[id], self._pipeline_context) 116 return self._id_to_obj[id] File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = [] 1455 for transform_id in proto.subtransforms:-> 1456 part = context.transforms.get_by_id(transform_id) 1457 part.parent = result 1458 result.add_part(part) File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): 112 # type: (str) -> PortableObjectT 113 if id not in self._id_to_obj:--> 114 self._id_to_obj[id] = self._obj_type.from_runner_api( 115 self._id_to_proto[id], self._pipeline_context) 116 return self._id_to_obj[id] File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>, in AppliedPTransform.from_runner_api(proto, context) 1419 side_input_tags = [] 1421 main_inputs = { 1422 tag: context.pcollections.get_by_id(id) 1423 for (tag, id) in proto.inputs.items() if tag not in side_input_tags 1424 }-> 1426 transform = ptransform.PTransform.from_runner_api(proto, context) 1427 if transform and proto.environment_id: 1428 resource_hints = context.environments.get_by_id( 1429 proto.environment_id).resource_hints() File ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769 <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>, in PTransform.from_runner_api(cls, proto, context) 767 if proto is None or proto.spec is None or not proto.spec.urn: 768 return None--> 769 parameter_type, constructor = cls._known_urns[proto.spec.urn] 771 return constructor( 772 proto, 773 proto_utils.parse_Bytes(proto.spec.payload, parameter_type), 774 context) KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'