[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ]
Mark Liu edited comment on BEAM-5953 at 11/13/18 12:12 AM: ----------------------------------------------------------- In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe] (the job was hanging since sdk harness startup failed, not related to this error. [harness-startup log|https://pantheon.corp.google.com/logs/viewer?resource=dataflow_step%2Fjob_id%2F2018-11-12_15_25_55-9696772999324855617&logName=projects%2Fgoogle.com:clouddfe%2Flogs%2Fdataflow.googleapis.com%252Fharness-startup&interval=NO_LIMIT&project=google.com:clouddfe&minLogLevel=0&expandAll=false×tamp=2018-11-13T00:11:14.449000000Z&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2018-11-12T23:26:40.731883000Z] shows the successful messages). It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) was (Author: markflyhigh): In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe]. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > ---------------------------------- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Mark Liu > Assignee: Mark Liu > Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)