BTW it doesn't seem to be related to the BQ sink. My job is failing now too
without that part (and it wasn't earlier today):

def test_error(
    bq_table: str) -> str:

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions

    class GenData(beam.DoFn):
        def process(self, _):
            for _ in range (20000):
                yield {'a':1,'b':2}

    pipeline = beam.Pipeline(options=PipelineOptions(
        project='my-project',
        temp_location = 'gs://my-bucket/temp',
        staging_location = 'gs://my-bucket/staging',
        runner='DataflowRunner'
    ))
    #pipeline = beam.Pipeline()

    (
        pipeline
        | 'Empty start' >> beam.Create([''])
        | 'Generate Data' >> beam.ParDo(GenData())
        | 'print' >> beam.Map(print)
    )

    result = pipeline.run()
    result.wait_until_finish()

    return True

test_error(
    bq_table = 'my-project:my_dataset.my_table'
)

On Tue, Feb 4, 2020 at 11:21 AM Alan Krumholz <alan.krumh...@betterup.co>
wrote:

> Here is a test job that sometimes fails and sometimes doesn't (but most
> times do).....
> There seems to be something stochastic that causes this as after several
> tests a couple of them did succeed....
>
>
> def test_error(
>     bq_table: str) -> str:
>
>     import apache_beam as beam
>     from apache_beam.options.pipeline_options import PipelineOptions
>
>     class GenData(beam.DoFn):
>         def process(self, _):
>             for _ in range (20000):
>                 yield {'a':1,'b':2}
>
>
>     def get_bigquery_schema():
>         from apache_beam.io.gcp.internal.clients import bigquery
>
>         table_schema = bigquery.TableSchema()
>         columns = [
>             ["a","integer","nullable"],
>             ["b","integer","nullable"]
>         ]
>
>         for column in columns:
>             column_schema = bigquery.TableFieldSchema()
>             column_schema.name = column[0]
>             column_schema.type = column[1]
>             column_schema.mode = column[2]
>             table_schema.fields.append(column_schema)
>
>         return table_schema
>
>     pipeline = beam.Pipeline(options=PipelineOptions(
>         project='my-project',
>         temp_location = 'gs://my-bucket/temp',
>         staging_location = 'gs://my-bucket/staging',
>         runner='DataflowRunner'
>     ))
>     #pipeline = beam.Pipeline()
>
>     (
>         pipeline
>         | 'Empty start' >> beam.Create([''])
>         | 'Generate Data' >> beam.ParDo(GenData())
>         #| 'print' >> beam.Map(print)
>         | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
>                     project=bq_table.split(':')[0],
>                     dataset=bq_table.split(':')[1].split('.')[0],
>                     table=bq_table.split(':')[1].split('.')[1],
>                     schema=get_bigquery_schema(),
>
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
>
> write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
>     )
>
>     result = pipeline.run()
>     result.wait_until_finish()
>
>     return True
>
> test_error(
>     bq_table = 'my-project:my_dataset.my_table'
> )
>
> On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz <alan.krumh...@betterup.co>
> wrote:
>
>> I tried breaking apart my pipeline. Seems the step that breaks it is:
>> beam.io.WriteToBigQuery
>>
>> Let me see if I can create a self contained example that breaks to share
>> with you
>>
>> Thanks!
>>
>> On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada <pabl...@google.com> wrote:
>>
>>> Hm that's odd. No changes to the pipeline? Are you able to share some of
>>> the code?
>>>
>>> +Udi Meiri <eh...@google.com> do you have any idea what could be going
>>> on here?
>>>
>>> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz <alan.krumh...@betterup.co>
>>> wrote:
>>>
>>>> Hi Pablo,
>>>> This is strange... it doesn't seem to be the last beam release as last
>>>> night it was already using 2.19.0 I wonder if it was some release from the
>>>> DataFlow team (not beam related):
>>>> Job typeBatch
>>>> Job status Succeeded
>>>> SDK version
>>>> Apache Beam Python 3.5 SDK 2.19.0
>>>> Region
>>>> us-central1
>>>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
>>>> Elapsed time5 min 11 sec
>>>>
>>>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada <pabl...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Alan,
>>>>> could it be that you're picking up the new Apache Beam 2.19.0 release?
>>>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when
>>>>> using the new release?
>>>>>
>>>>> If something was working and no longer works, it sounds like a bug.
>>>>> This may have to do with how we pickle (dill / cloudpickle) - see this
>>>>> question
>>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>>> Best
>>>>> -P.
>>>>>
>>>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz <
>>>>> alan.krumh...@betterup.co> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I was running a dataflow job in GCP last night and it was running
>>>>>> fine.
>>>>>> This morning this same exact job is failing with the following error:
>>>>>>
>>>>>> Error message from worker: Traceback (most recent call last): File
>>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 286, in loads return dill.loads(s) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in 
>>>>>> loads
>>>>>> return load(file, ignore, **kwds) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>>> obj = StockUnpickler.load(self) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During
>>>>>> handling of the above exception, another exception occurred: Traceback
>>>>>> (most recent call last): File
>>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
>>>>>> line 648, in do_work work_executor.execute() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", 
>>>>>> line
>>>>>> 176, in execute op.start() File 
>>>>>> "apache_beam/runners/worker/operations.py",
>>>>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 651, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 652, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 261, in
>>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 266, in
>>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 597, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>>> "apache_beam/runners/worker/operations.py", line 602, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 290, in loads return dill.loads(s) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in 
>>>>>> loads
>>>>>> return load(file, ignore, **kwds) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>>> obj = StockUnpickler.load(self) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType'
>>>>>>
>>>>>>
>>>>>> If I use a local runner it still runs fine.
>>>>>> Anyone else experiencing something similar today? (or know how to fix
>>>>>> this?)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>

Reply via email to