Python apache beam bigquery temporary tables

2024-05-16 Thread Pawel Jarkowski via dev
Hi,
I'm working on a pipeline that gets data from pub/sub and splits the data
into a few big query tables. I have a problem with temporary tables because
the pipeline creates the temp tables (I have to compute table id and schema
based on data) but it does not delete all of them. Is there any possibility
of setting a different dataset for temporary tables or batching the data
into a list of dicts and pushing more at once? I tried to create
windowing + groupbykey where the key is a table name and this code created
a list of dictionaries but I got an error message:
"""
File "fastavro/_write.pyx", line 732, in fastavro._write.Writer.write
  File "fastavro/_write.pyx", line 469, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 459, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 357, in fastavro._write.write_record
TypeError: Error writing row to Avro: unhashable type: 'dict'
"""
Without the grouping and windowing all works fine but the pipeline creates
a lot of temporary tables that make a mess in our datasets.
There are my bigquery options:
method="FILE_LOADS"
triggering_frequency=60
max_files_per_bundle=1
temp_file_format="AVRO"
table="" << this is computed
dataset="" << this is computed
project="my_project"
custom_gcs_temp_location="gs://my_gcp_tmp_location/temp/"

There are my pipeline options:
runner="DataflowRunner"
region="my_region"
project="my_project"
service_account_email="my_df_SA"
temp_location="gs://my_gcp_location/temp/"
max_num_workers=10
no_use_public_ips=true
use_public_ips=false
network="my_network_url"
subnetwork="my_subnetwork_url"
machine_type="my_machine_type"
label="pubsub-qa-label"
job_name="job-name-pubsub-qa"
save_main_session=false
streaming=true
setup_file="./setup.py"

Thank you in advance for your reply and best regards,
Paweł Jarkowski


Re: Python apache beam bigquery temporary tables

2024-05-17 Thread XQ Hu via dev
I think you hit this issue:
https://stackoverflow.com/questions/64526500/using-writetobigquery-file-loads-in-a-streaming-pipeline-just-creates-a-lot-of-t
I created https://github.com/apache/beam/issues/31329 to track this bug.

Some workarounds:
1. https://cloud.google.com/dataflow/docs/guides/write-to-bigquery has more
docs about different write methods. I would recommend STORAGE_WRITE_API
or STORAGE_API_AT_LEAST_ONCE
2. If you have another project, you could use `load_job_project_id` to
store these temp tables and remove them regularly
3. Like the comments in the stackoverflow question, just do GCS and then
load them to BigQuery.


On Thu, May 16, 2024 at 10:10 AM Pawel Jarkowski via dev <
dev@beam.apache.org> wrote:

> Hi,
> I'm working on a pipeline that gets data from pub/sub and splits the data
> into a few big query tables. I have a problem with temporary tables because
> the pipeline creates the temp tables (I have to compute table id and schema
> based on data) but it does not delete all of them. Is there any possibility
> of setting a different dataset for temporary tables or batching the data
> into a list of dicts and pushing more at once? I tried to create
> windowing + groupbykey where the key is a table name and this code created
> a list of dictionaries but I got an error message:
> """
> File "fastavro/_write.pyx", line 732, in fastavro._write.Writer.write
>   File "fastavro/_write.pyx", line 469, in fastavro._write.write_data
>   File "fastavro/_write.pyx", line 459, in fastavro._write.write_data
>   File "fastavro/_write.pyx", line 357, in fastavro._write.write_record
> TypeError: Error writing row to Avro: unhashable type: 'dict'
> """
> Without the grouping and windowing all works fine but the pipeline creates
> a lot of temporary tables that make a mess in our datasets.
> There are my bigquery options:
> method="FILE_LOADS"
> triggering_frequency=60
> max_files_per_bundle=1
> temp_file_format="AVRO"
> table="" << this is computed
> dataset="" << this is computed
> project="my_project"
> custom_gcs_temp_location="gs://my_gcp_tmp_location/temp/"
>
> There are my pipeline options:
> runner="DataflowRunner"
> region="my_region"
> project="my_project"
> service_account_email="my_df_SA"
> temp_location="gs://my_gcp_location/temp/"
> max_num_workers=10
> no_use_public_ips=true
> use_public_ips=false
> network="my_network_url"
> subnetwork="my_subnetwork_url"
> machine_type="my_machine_type"
> label="pubsub-qa-label"
> job_name="job-name-pubsub-qa"
> save_main_session=false
> streaming=true
> setup_file="./setup.py"
>
> Thank you in advance for your reply and best regards,
> Paweł Jarkowski
>