+Chamikara Jayalath <[email protected]> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may have addressed the issue. It avoids the problematic code when the pipeline is multi-language [2].
[1] https://github.com/apache/beam/pull/13536 [2] https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524 On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <[email protected]> wrote: > I'm trying to write to a Kafka topic using WriteTokafka module from > apache_beam.io.kafka. > The error I get is: > >> File "predict.py", line 162, in <module> >> run() >> File "predict.py", line 158, in run >> topic = 'int.fitbit_explore.video_recommendations')) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 580, in __exit__ >> self.result = self.run() >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 530, in run >> self._options).run(False) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 902, in from_runner_api >> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 116, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1252, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 116, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1252, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 116, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1252, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 116, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1252, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 116, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1229, in from_runner_api >> transform = ptransform.PTransform.from_runner_api(proto, context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >> line 733, in from_runner_api >> context) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >> line 1420, in from_runner_api_parameter >> pardo_payload.do_fn, context).serialized_dofn_data()) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >> line 1493, in from_runner_api >> raise ValueError('Unexpected DoFn type: %s' % spec.urn) >> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1 > > > The pipeline looks like this: > >> pipeline_options = PipelineOptions(argv) >> with beam.Pipeline(options=pipeline_options) as p: >> _ = (p | 'Create' >> beam.Create(['Start']) >> | 'Read MDAU' >> >> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt") >> | 'Predict' >> beam.ParDo(PredictDoFn()) >> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn()) >> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' : >> '<fitbit-bootstrap-server>:9092'}, >> topic = '<internal_fitbit_topic>')) > > > I replaced the bootstrap server and topic values with placeholders here > because I'm not sure if I should show them or not. > > The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes > and it looks like this: > > class ThriftEncodeDoFn(beam.DoFn): def encode(self, element): > video = VideosAndRatings() > video.videoId = str(element['videoId']) > video.rating = 5 > video.index = 1 > videosList = [video] > recommendations = RecommendationsKafkaMessage() recommendations.userId > = str(element['userId']) > recommendations.videos = videosList > recommendations.category = "DISCOVER_WORKOUTS" > print(recommendations.userId, recommendations.category) > trans = TTransport.TMemoryBuffer() > proto = TBinaryProtocol.TBinaryProtocol(trans) > recommendations.write(proto) encoded_data = bytes(trans.getvalue()) > encoded_key = str(element['userId']).encode() return encoded_key, > encoded_data > > def process(self, element) -> Iterable[Tuple[bytes,bytes]]: > try: > encoded_key, encoded_data = self.encode(element) > yield (encoded_key, encoded_data) > except Exception as e: > print("encoding didn't work", e) > yield TaggedOutput('encode_errors', f'element={element}, error={e}') > > The command I use to run the pipeline is this: > > python3 predict.py \ > --work-dir gs://fit-recommend-system-testpy/saved_model \ > --batch \ > --project fit-recommend-system-int \ > --runner DataflowRunner \ > --setup_file ./setup.py \ > --subnetwork https://www.googleapis.com/compute/v1/projects/< > <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> > \ > --job_name prediction \ > --region us-central1 \ > --temp_location gs://fit-recommend-system-testpy/temp \ > --staging_location gs://fit-recommend-system-testpy/staging \ > --no_use_public_ips \ > --sdk_harness_container_image_overrides > ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \ > --service_account_email > [email protected] > > And I have installed apache beam with python3 -m pip install > apache_beam[gcp]==2.26.0. > > Any help with this is much appreciated! > > Best regards, > > Irina > >
