Hi,
I'm working on a pipeline that gets data from pub/sub and splits the data
into a few big query tables. I have a problem with temporary tables because
the pipeline creates the temp tables (I have to compute table id and schema
based on data) but it does not delete all of them. Is there any possibility
of setting a different dataset for temporary tables or batching the data
into a list of dicts and pushing more at once? I tried to create
windowing + groupbykey where the key is a table name and this code created
a list of dictionaries but I got an error message:
"""
File "fastavro/_write.pyx", line 732, in fastavro._write.Writer.write
  File "fastavro/_write.pyx", line 469, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 459, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 357, in fastavro._write.write_record
TypeError: Error writing row to Avro: unhashable type: 'dict'
"""
Without the grouping and windowing all works fine but the pipeline creates
a lot of temporary tables that make a mess in our datasets.
There are my bigquery options:
method="FILE_LOADS"
triggering_frequency=60
max_files_per_bundle=1
temp_file_format="AVRO"
table="" << this is computed
dataset="" << this is computed
project="my_project"
custom_gcs_temp_location="gs://my_gcp_tmp_location/temp/"

There are my pipeline options:
runner="DataflowRunner"
region="my_region"
project="my_project"
service_account_email="my_df_SA"
temp_location="gs://my_gcp_location/temp/"
max_num_workers=10
no_use_public_ips=true
use_public_ips=false
network="my_network_url"
subnetwork="my_subnetwork_url"
machine_type="my_machine_type"
label="pubsub-qa-label"
job_name="job-name-pubsub-qa"
save_main_session=false
streaming=true
setup_file="./setup.py"

Thank you in advance for your reply and best regards,
Paweł Jarkowski

Reply via email to