Dear Community,

I'm trying to use the following script to write into BigTable using
Beam/DataFlow, but get the below error. Did anyone manage to run this
script ?

Thanks for your support !

*Python*: 3.7

*Code*

1) Create instance
gcloud beta bigtable instances create test-instance --cluster test-cluster
--display-name test-instance --cluster-zone us-central1-a
--cluster-num-nodes 3

2) Main logic
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigtableio.py

3) Homemade pipeline

pipeline_options = PipelineOptions(
    save_main_session=True, streaming=True,
    runner='DataflowRunner',
    project=PROJECT,
    region=REGION,
    temp_location=TEMP_LOCATION,
    staging_location=STAGING_LOCATION
)

def run ():

     with beam.Pipeline(options=pipeline_options) as p:

input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
        _ = (p
       | 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
       | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg:
msg.decode('utf-8'))
       | WriteToBigTable(PROJECT,INSTANCE,TABLE))

*Error*

run()
Traceback (most recent call last):

  File "<ipython-input-49-ec9775ede022>", line 1, in <module>
    run()

  File "/<mycode>/bigtableio.py", line 300, in run
    TABLE))

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 582, in __exit__
    self.result = self.run()

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 532, in run
    self._options).run(False)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 558, in run
    pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 313, in dump_session
    return dill.dump_session(file_path)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 351, in dump_session
    pickler.dump(main)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 445, in dump
    StockPickler.dump(self, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 437, in dump
    self.save(obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 173, in save_module
    return old_save_module(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 1295, in save_module
    state=_main_dict)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 524, in save
    rv = reduce(self.proto)

  File "stringsource", line 2, in
grpc._cython.cygrpc.Channel.__reduce_cython__

TypeError: no default __reduce__ due to non-trivial __cinit__

-- 
Best regards, Pierre

Reply via email to