[ https://issues.apache.org/jira/browse/BEAM-6892?focusedWorklogId=235605&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235605 ]
ASF GitHub Bot logged work on BEAM-6892: ---------------------------------------- Author: ASF GitHub Bot Created on: 01/May/19 00:04 Start Date: 01/May/19 00:04 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #8102: [BEAM-6892] Adding support for side inputs on table & schema. Also adding additional params for BQ. URL: https://github.com/apache/beam/pull/8102#discussion_r279955817 ########## File path: sdks/python/apache_beam/io/gcp/bigquery.py ########## @@ -78,15 +78,120 @@ or a table. Pipeline construction will fail with a validation error if neither or both are specified. -**Time partitioned tables** +Writing Data to BigQuery +======================== + +The `WriteToBigQuery` transform is the recommended way of writing data to +BigQuery. It supports a large set of parameters to customize how you'd like to +write to BigQuery. + +Table References +---------------- + +This transform allows you to provide static `project`, `dataset` and `table` +parameters which point to a specific BigQuery table to be created. The `table` +parameter can also be a dynamic parameter (i.e. a callable), which receives an +element to be written to BigQuery, and returns the table that that element +should be sent to. + +You may also provide a tuple of PCollectionView elements to be passed as side +inputs to your callable. For example, suppose that one wishes to send +events of different types to different tables, and the table names are +computed at pipeline runtime, one may do something like so:: + + with Pipeline() as p: + elements = (p | beam.Create([ + {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'}, + {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'}, + ])) + + table_names = (p | beam.Create([ + ('error', 'my_project.dataset1.error_table_for_today'), + ('user_log', 'my_project.dataset1.query_table_for_today'), + ]) + + table_names_dict = beam.pvalue.AsDict(table_names) + + elements | beam.io.gcp.WriteToBigQuery( + table=lambda row, table_dict: table_dict[row['type']], Review comment: Probably explain (in text) that this table_dict is the side input that is passed below. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 235605) Time Spent: 5.5h (was: 5h 20m) > Use temp_location for BQ FILE_LOADS on DirectRunner, and autocreate it in GCS > if not specified by user. > ------------------------------------------------------------------------------------------------------- > > Key: BEAM-6892 > URL: https://issues.apache.org/jira/browse/BEAM-6892 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Valentyn Tymofieiev > Assignee: Pablo Estrada > Priority: Major > Fix For: 2.13.0 > > Time Spent: 5.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)