I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452).

The contributors guide suggests that if an issue is your first, it should
be discussed on this mailing list. I would like to hear thoughts,
questions, concerns etc on this proposed fix:

def process(self, element, load_job_name_prefix, *schema_side_inputs):

# Each load job is assumed to have files respecting these constraints:

 # 1. Total size of all files < 15 TB (Max size for load jobs)

 # 2. Total no. of files in a single load job < 10,000

 # This assumption means that there will always be a single load job

 # triggered for each partition of files.

 destination = element[0]

 files = element[1]

if callable(self.schema):

 schema = self.schema(destination, *schema_side_inputs)

 elif isinstance(self.schema, vp.ValueProvider):

 schema = self.schema.get()

 else:

 schema = self.schema

if isinstance(schema, (str, unicode)):

 schema = bigquery_tools.parse_table_schema_from_json(schema)

 elif isinstance(schema, dict):

 schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))

....

Reply via email to