Hi Noah! Thanks for contributing this. The change is relatively small, so I think it's best to discuss it in a pull request, so we can look at the diff. My preliminary comment would be that the first part makes sense:
if isinstance(schema, (str, unicode)): schema = bigquery_tools.parse_table_schema_from_json(schema) And the second part seems unnecessary, but perhaps I'm missing something. Why are you reparsing the schema? elif isinstance(schema, dict): schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema)) Thanks! Please submit a pull request, and tag me (@pabloem) to review. Best -P. On Mon, Oct 21, 2019 at 3:38 PM Noah Goodrich <m...@noahgoodrich.com> wrote: > I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452 > ). > > The contributors guide suggests that if an issue is your first, it should > be discussed on this mailing list. I would like to hear thoughts, > questions, concerns etc on this proposed fix: > > def process(self, element, load_job_name_prefix, *schema_side_inputs): > > # Each load job is assumed to have files respecting these constraints: > > # 1. Total size of all files < 15 TB (Max size for load jobs) > > # 2. Total no. of files in a single load job < 10,000 > > # This assumption means that there will always be a single load job > > # triggered for each partition of files. > > destination = element[0] > > files = element[1] > > if callable(self.schema): > > schema = self.schema(destination, *schema_side_inputs) > > elif isinstance(self.schema, vp.ValueProvider): > > schema = self.schema.get() > > else: > > schema = self.schema > > if isinstance(schema, (str, unicode)): > > schema = bigquery_tools.parse_table_schema_from_json(schema) > > elif isinstance(schema, dict): > > schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema)) > > .... > >