I would suggest also including a more recent fix [1] or using
the workaround mentioned in [2].

Thanks,
Cham

[1] https://github.com/apache/beam/pull/14306
[2]
https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920

On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <[email protected]> wrote:

> +Chamikara Jayalath <[email protected]>
>
> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may
> have addressed the issue. It avoids the problematic code when the pipeline
> is multi-language [2].
>
> [1] https://github.com/apache/beam/pull/13536
> [2]
> https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524
>
> On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <[email protected]>
> wrote:
>
>> I'm trying to write to a Kafka topic using WriteTokafka module from
>> apache_beam.io.kafka.
>> The error I get is:
>>
>>> File "predict.py", line 162, in <module>
>>> run()
>>> File "predict.py", line 158, in run
>>> topic = 'int.fitbit_explore.video_recommendations'))
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 580, in __exit__
>>> self.result = self.run()
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 530, in run
>>> self._options).run(False)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 902, in from_runner_api
>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1229, in from_runner_api
>>> transform = ptransform.PTransform.from_runner_api(proto, context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>> line 733, in from_runner_api
>>> context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>> line 1420, in from_runner_api_parameter
>>> pardo_payload.do_fn, context).serialized_dofn_data())
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>> line 1493, in from_runner_api
>>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>>
>>
>> The pipeline looks like this:
>>
>>> pipeline_options = PipelineOptions(argv)
>>> with beam.Pipeline(options=pipeline_options) as p:
>>> _ = (p | 'Create' >> beam.Create(['Start'])
>>> | 'Read MDAU' >>
>>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>>> | 'Predict' >> beam.ParDo(PredictDoFn())
>>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers'
>>> : '<fitbit-bootstrap-server>:9092'},
>>> topic = '<internal_fitbit_topic>'))
>>
>>
>> I replaced the bootstrap server and topic values with placeholders here
>> because I'm not sure if I should show them or not.
>>
>> The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
>> and it looks like this:
>>
>> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
>>     video = VideosAndRatings()
>>     video.videoId = str(element['videoId'])
>>     video.rating = 5
>>     video.index = 1
>>     videosList = [video]
>>     recommendations = RecommendationsKafkaMessage()    
>> recommendations.userId = str(element['userId'])
>>     recommendations.videos = videosList
>>     recommendations.category = "DISCOVER_WORKOUTS"    
>> print(recommendations.userId, recommendations.category)
>>     trans = TTransport.TMemoryBuffer()
>>     proto = TBinaryProtocol.TBinaryProtocol(trans)
>>     recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
>>     encoded_key = str(element['userId']).encode()    return encoded_key, 
>> encoded_data
>>
>>   def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
>>     try:
>>       encoded_key, encoded_data = self.encode(element)
>>       yield (encoded_key, encoded_data)
>>     except Exception as e:
>>       print("encoding didn't work", e)
>>       yield TaggedOutput('encode_errors', f'element={element}, error={e}')
>>
>> The command I use to run the pipeline is this:
>>
>> python3 predict.py \
>>   --work-dir gs://fit-recommend-system-testpy/saved_model \
>>   --batch \
>>   --project fit-recommend-system-int \
>>   --runner DataflowRunner \
>>   --setup_file ./setup.py \
>>   --subnetwork https://www.googleapis.com/compute/v1/projects/< 
>> <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork>
>>  \
>>   --job_name prediction \
>>   --region us-central1 \
>>   --temp_location gs://fit-recommend-system-testpy/temp \
>>   --staging_location gs://fit-recommend-system-testpy/staging \
>>   --no_use_public_ips \
>>   --sdk_harness_container_image_overrides 
>> ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
>>   --service_account_email 
>> [email protected]
>>
>> And I have installed apache beam with python3 -m pip install 
>> apache_beam[gcp]==2.26.0.
>>
>> Any help with this is much appreciated!
>>
>> Best regards,
>>
>> Irina
>>
>>

Reply via email to