I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452).
The contributors guide suggests that if an issue is your first, it should be discussed on this mailing list. I would like to hear thoughts, questions, concerns etc on this proposed fix: def process(self, element, load_job_name_prefix, *schema_side_inputs): # Each load job is assumed to have files respecting these constraints: # 1. Total size of all files < 15 TB (Max size for load jobs) # 2. Total no. of files in a single load job < 10,000 # This assumption means that there will always be a single load job # triggered for each partition of files. destination = element[0] files = element[1] if callable(self.schema): schema = self.schema(destination, *schema_side_inputs) elif isinstance(self.schema, vp.ValueProvider): schema = self.schema.get() else: schema = self.schema if isinstance(schema, (str, unicode)): schema = bigquery_tools.parse_table_schema_from_json(schema) elif isinstance(schema, dict): schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema)) ....