BTW it doesn't seem to be related to the BQ sink. My job is failing now too without that part (and it wasn't earlier today):
def test_error( bq_table: str) -> str: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions class GenData(beam.DoFn): def process(self, _): for _ in range (20000): yield {'a':1,'b':2} pipeline = beam.Pipeline(options=PipelineOptions( project='my-project', temp_location = 'gs://my-bucket/temp', staging_location = 'gs://my-bucket/staging', runner='DataflowRunner' )) #pipeline = beam.Pipeline() ( pipeline | 'Empty start' >> beam.Create(['']) | 'Generate Data' >> beam.ParDo(GenData()) | 'print' >> beam.Map(print) ) result = pipeline.run() result.wait_until_finish() return True test_error( bq_table = 'my-project:my_dataset.my_table' ) On Tue, Feb 4, 2020 at 11:21 AM Alan Krumholz <alan.krumh...@betterup.co> wrote: > Here is a test job that sometimes fails and sometimes doesn't (but most > times do)..... > There seems to be something stochastic that causes this as after several > tests a couple of them did succeed.... > > > def test_error( > bq_table: str) -> str: > > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > > class GenData(beam.DoFn): > def process(self, _): > for _ in range (20000): > yield {'a':1,'b':2} > > > def get_bigquery_schema(): > from apache_beam.io.gcp.internal.clients import bigquery > > table_schema = bigquery.TableSchema() > columns = [ > ["a","integer","nullable"], > ["b","integer","nullable"] > ] > > for column in columns: > column_schema = bigquery.TableFieldSchema() > column_schema.name = column[0] > column_schema.type = column[1] > column_schema.mode = column[2] > table_schema.fields.append(column_schema) > > return table_schema > > pipeline = beam.Pipeline(options=PipelineOptions( > project='my-project', > temp_location = 'gs://my-bucket/temp', > staging_location = 'gs://my-bucket/staging', > runner='DataflowRunner' > )) > #pipeline = beam.Pipeline() > > ( > pipeline > | 'Empty start' >> beam.Create(['']) > | 'Generate Data' >> beam.ParDo(GenData()) > #| 'print' >> beam.Map(print) > | 'Write to BigQuery' >> beam.io.WriteToBigQuery( > project=bq_table.split(':')[0], > dataset=bq_table.split(':')[1].split('.')[0], > table=bq_table.split(':')[1].split('.')[1], > schema=get_bigquery_schema(), > > create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, > > write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) > ) > > result = pipeline.run() > result.wait_until_finish() > > return True > > test_error( > bq_table = 'my-project:my_dataset.my_table' > ) > > On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz <alan.krumh...@betterup.co> > wrote: > >> I tried breaking apart my pipeline. Seems the step that breaks it is: >> beam.io.WriteToBigQuery >> >> Let me see if I can create a self contained example that breaks to share >> with you >> >> Thanks! >> >> On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada <pabl...@google.com> wrote: >> >>> Hm that's odd. No changes to the pipeline? Are you able to share some of >>> the code? >>> >>> +Udi Meiri <eh...@google.com> do you have any idea what could be going >>> on here? >>> >>> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz <alan.krumh...@betterup.co> >>> wrote: >>> >>>> Hi Pablo, >>>> This is strange... it doesn't seem to be the last beam release as last >>>> night it was already using 2.19.0 I wonder if it was some release from the >>>> DataFlow team (not beam related): >>>> Job typeBatch >>>> Job status Succeeded >>>> SDK version >>>> Apache Beam Python 3.5 SDK 2.19.0 >>>> Region >>>> us-central1 >>>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8 >>>> Elapsed time5 min 11 sec >>>> >>>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada <pabl...@google.com> >>>> wrote: >>>> >>>>> Hi Alan, >>>>> could it be that you're picking up the new Apache Beam 2.19.0 release? >>>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when >>>>> using the new release? >>>>> >>>>> If something was working and no longer works, it sounds like a bug. >>>>> This may have to do with how we pickle (dill / cloudpickle) - see this >>>>> question >>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype >>>>> Best >>>>> -P. >>>>> >>>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz < >>>>> alan.krumh...@betterup.co> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I was running a dataflow job in GCP last night and it was running >>>>>> fine. >>>>>> This morning this same exact job is failing with the following error: >>>>>> >>>>>> Error message from worker: Traceback (most recent call last): File >>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>>>> line 286, in loads return dill.loads(s) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in >>>>>> loads >>>>>> return load(file, ignore, **kwds) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >>>>>> obj = StockUnpickler.load(self) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During >>>>>> handling of the above exception, another exception occurred: Traceback >>>>>> (most recent call last): File >>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", >>>>>> line 648, in do_work work_executor.execute() File >>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", >>>>>> line >>>>>> 176, in execute op.start() File >>>>>> "apache_beam/runners/worker/operations.py", >>>>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File >>>>>> "apache_beam/runners/worker/operations.py", line 651, in >>>>>> apache_beam.runners.worker.operations.DoOperation.start File >>>>>> "apache_beam/runners/worker/operations.py", line 652, in >>>>>> apache_beam.runners.worker.operations.DoOperation.start File >>>>>> "apache_beam/runners/worker/operations.py", line 261, in >>>>>> apache_beam.runners.worker.operations.Operation.start File >>>>>> "apache_beam/runners/worker/operations.py", line 266, in >>>>>> apache_beam.runners.worker.operations.Operation.start File >>>>>> "apache_beam/runners/worker/operations.py", line 597, in >>>>>> apache_beam.runners.worker.operations.DoOperation.setup File >>>>>> "apache_beam/runners/worker/operations.py", line 602, in >>>>>> apache_beam.runners.worker.operations.DoOperation.setup File >>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>>>> line 290, in loads return dill.loads(s) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in >>>>>> loads >>>>>> return load(file, ignore, **kwds) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >>>>>> obj = StockUnpickler.load(self) File >>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' >>>>>> >>>>>> >>>>>> If I use a local runner it still runs fine. >>>>>> Anyone else experiencing something similar today? (or know how to fix >>>>>> this?) >>>>>> >>>>>> Thanks! >>>>>> >>>>>