Dataflow multi-language pipelines need Runner v2 so you need to specify the option "--experiments=use_runner_v2". Please see the example here <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi> for an exact command.
On Wed, Mar 31, 2021 at 1:06 PM Maria-Irina Sandu <[email protected]> wrote: > Thanks all for replying! > I tried with both 2.27.0 and 2.28.0 and the error was the same. I managed > to make some progress using the second option proposed by Cham and am now > getting the following error: > >> Traceback (most recent call last): >> File "predict.py", line 163, in <module> >> run() >> File "predict.py", line 159, in run >> p.run(False).wait_until_finish() >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 559, in run >> return self.runner.run_pipeline(self, self._options) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", >> line 638, in run_pipeline >> self.dataflow_client.create_job(self.job), self) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py", >> line 260, in wrapper >> return fun(*args, **kwargs) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", >> line 680, in create_job >> return self.submit_job_description(job) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py", >> line 260, in wrapper >> return fun(*args, **kwargs) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", >> line 747, in submit_job_description >> response = self._client.projects_locations_jobs.Create(request) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py", >> line 667, in Create >> config, request, global_params=global_params) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py", >> line 731, in _RunMethod >> return self.ProcessHttpResponse(method_config, http_response, request) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py", >> line 737, in ProcessHttpResponse >> self.__ProcessHttpResponse(method_config, http_response, request)) >> File >> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py", >> line 604, in __ProcessHttpResponse >> http_response, method_config=method_config, request=request) >> apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing < >> https://dataflow.googleapis.com/v1b3/projects/fit-recommend-system-int/locations/us-central1/jobs?alt=json>: >> response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': >> 'application/json; charset=UTF-8', 'date': 'Wed, 31 Mar 2021 19:14:32 GMT', >> 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', >> 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', >> 'alt-svc': 'h3-29=":443"; ma=2592000,h3-T051=":443"; >> ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; >> ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"', >> 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '288', >> '-content-encoding': 'gzip'}>, content <{ >> "error": { >> "code": 400, >> "message": "Dataflow Runner v2 requires a valid FnApi job, Please >> resubmit your job with a valid configuration. Note that if using Templates, >> you may need to regenerate your template with the '--use_runner_v2'.", >> "status": "INVALID_ARGUMENT" >> } >> } > > > > On Tue, Mar 30, 2021 at 11:27 PM Chamikara Jayalath <[email protected]> > wrote: > >> I would suggest also including a more recent fix [1] or using >> the workaround mentioned in [2]. >> >> Thanks, >> Cham >> >> [1] https://github.com/apache/beam/pull/14306 >> [2] >> https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920 >> >> On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <[email protected]> >> wrote: >> >>> +Chamikara Jayalath <[email protected]> >>> >>> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may >>> have addressed the issue. It avoids the problematic code when the pipeline >>> is multi-language [2]. >>> >>> [1] https://github.com/apache/beam/pull/13536 >>> [2] >>> https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524 >>> >>> On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <[email protected]> >>> wrote: >>> >>>> I'm trying to write to a Kafka topic using WriteTokafka module from >>>> apache_beam.io.kafka. >>>> The error I get is: >>>> >>>>> File "predict.py", line 162, in <module> >>>>> run() >>>>> File "predict.py", line 158, in run >>>>> topic = 'int.fitbit_explore.video_recommendations')) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 580, in __exit__ >>>>> self.result = self.run() >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 530, in run >>>>> self._options).run(False) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 902, in from_runner_api >>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>>>> line 116, in get_by_id >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 1252, in from_runner_api >>>>> part = context.transforms.get_by_id(transform_id) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>>>> line 116, in get_by_id >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 1252, in from_runner_api >>>>> part = context.transforms.get_by_id(transform_id) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>>>> line 116, in get_by_id >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 1252, in from_runner_api >>>>> part = context.transforms.get_by_id(transform_id) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>>>> line 116, in get_by_id >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 1252, in from_runner_api >>>>> part = context.transforms.get_by_id(transform_id) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>>>> line 116, in get_by_id >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 1229, in from_runner_api >>>>> transform = ptransform.PTransform.from_runner_api(proto, context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>> line 733, in from_runner_api >>>>> context) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >>>>> line 1420, in from_runner_api_parameter >>>>> pardo_payload.do_fn, context).serialized_dofn_data()) >>>>> File >>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >>>>> line 1493, in from_runner_api >>>>> raise ValueError('Unexpected DoFn type: %s' % spec.urn) >>>>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1 >>>> >>>> >>>> The pipeline looks like this: >>>> >>>>> pipeline_options = PipelineOptions(argv) >>>>> with beam.Pipeline(options=pipeline_options) as p: >>>>> _ = (p | 'Create' >> beam.Create(['Start']) >>>>> | 'Read MDAU' >> >>>>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt") >>>>> | 'Predict' >> beam.ParDo(PredictDoFn()) >>>>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn()) >>>>> | 'WriteToKafka' >> WriteToKafka(producer_config >>>>> = {'bootstrap.servers' : '<fitbit-bootstrap-server>:9092'}, >>>>> topic = '<internal_fitbit_topic>')) >>>> >>>> >>>> I replaced the bootstrap server and topic values with placeholders here >>>> because I'm not sure if I should show them or not. >>>> >>>> The ThriftEncodeDoFn function seems to work. It produces a tuple of >>>> bytes and it looks like this: >>>> >>>> class ThriftEncodeDoFn(beam.DoFn): def encode(self, element): >>>> video = VideosAndRatings() >>>> video.videoId = str(element['videoId']) >>>> video.rating = 5 >>>> video.index = 1 >>>> videosList = [video] >>>> recommendations = RecommendationsKafkaMessage() >>>> recommendations.userId = str(element['userId']) >>>> recommendations.videos = videosList >>>> recommendations.category = "DISCOVER_WORKOUTS" >>>> print(recommendations.userId, recommendations.category) >>>> trans = TTransport.TMemoryBuffer() >>>> proto = TBinaryProtocol.TBinaryProtocol(trans) >>>> recommendations.write(proto) encoded_data = bytes(trans.getvalue()) >>>> encoded_key = str(element['userId']).encode() return encoded_key, >>>> encoded_data >>>> >>>> def process(self, element) -> Iterable[Tuple[bytes,bytes]]: >>>> try: >>>> encoded_key, encoded_data = self.encode(element) >>>> yield (encoded_key, encoded_data) >>>> except Exception as e: >>>> print("encoding didn't work", e) >>>> yield TaggedOutput('encode_errors', f'element={element}, error={e}') >>>> >>>> The command I use to run the pipeline is this: >>>> >>>> python3 predict.py \ >>>> --work-dir gs://fit-recommend-system-testpy/saved_model \ >>>> --batch \ >>>> --project fit-recommend-system-int \ >>>> --runner DataflowRunner \ >>>> --setup_file ./setup.py \ >>>> --subnetwork https://www.googleapis.com/compute/v1/projects/< >>>> <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> >>>> \ >>>> --job_name prediction \ >>>> --region us-central1 \ >>>> --temp_location gs://fit-recommend-system-testpy/temp \ >>>> --staging_location gs://fit-recommend-system-testpy/staging \ >>>> --no_use_public_ips \ >>>> --sdk_harness_container_image_overrides >>>> ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \ >>>> --service_account_email >>>> [email protected] >>>> >>>> And I have installed apache beam with python3 -m pip install >>>> apache_beam[gcp]==2.26.0. >>>> >>>> Any help with this is much appreciated! >>>> >>>> Best regards, >>>> >>>> Irina >>>> >>>>
