[ 
https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498
 ] 

Mark Liu edited comment on BEAM-5953 at 11/13/18 12:12 AM:
-----------------------------------------------------------

In order to verify the problem comes from corrupted proto file, I did two 
things: 1) dump proto into local file before staging and read it same way as 
DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 
2) replace existing upload approach ([this 
line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523])
 with google-cloud-storage library:
{code:python}
    # Upload pipline.pb via google-cloud-storage
    import tempfile
    temp_file = tempfile.NamedTemporaryFile()
    with open(temp_file.name, 'wb') as f:
      f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read())

    gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 
'pipeline.pb')
    bucket_name, name = gcs_location[5:].split('/', 1)

    from google.cloud import storage
    client = storage.Client(project='my-project')
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(name)
    blob.upload_from_filename(temp_file.name)
    logging.info('Starting GCS upload to %s...', gcs_location)
{code}
With the code change, runner harness started successfully. see [this 
job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe]
 (the job was hanging since sdk harness startup failed, not related to this 
error. [harness-startup 
log|https://pantheon.corp.google.com/logs/viewer?resource=dataflow_step%2Fjob_id%2F2018-11-12_15_25_55-9696772999324855617&logName=projects%2Fgoogle.com:clouddfe%2Flogs%2Fdataflow.googleapis.com%252Fharness-startup&interval=NO_LIMIT&project=google.com:clouddfe&minLogLevel=0&expandAll=false&timestamp=2018-11-13T00:11:14.449000000Z&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2018-11-12T23:26:40.731883000Z]
 shows the successful messages).

It could be another case to support moving off google-apitools from SDK 
(https://issues.apache.org/jira/browse/BEAM-4850)


was (Author: markflyhigh):
In order to verify the problem comes from corrupted proto file, I did two 
things: 1) dump proto into local file before staging and read it same way as 
DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 
2) replace existing upload approach ([this 
line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523])
 with google-cloud-storage library:
{code:python}
    # Upload pipline.pb via google-cloud-storage
    import tempfile
    temp_file = tempfile.NamedTemporaryFile()
    with open(temp_file.name, 'wb') as f:
      f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read())

    gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 
'pipeline.pb')
    bucket_name, name = gcs_location[5:].split('/', 1)

    from google.cloud import storage
    client = storage.Client(project='my-project')
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(name)
    blob.upload_from_filename(temp_file.name)
    logging.info('Starting GCS upload to %s...', gcs_location)
{code}
With the code change, runner harness started successfully. see [this 
job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe].

It could be another case to support moving off google-apitools from SDK 
(https://issues.apache.org/jira/browse/BEAM-4850)

> Support DataflowRunner on Python 3
> ----------------------------------
>
>                 Key: BEAM-5953
>                 URL: https://issues.apache.org/jira/browse/BEAM-5953
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to