AnandInguva commented on code in PR #30146: URL: https://github.com/apache/beam/pull/30146#discussion_r1478572282
########## sdks/python/apache_beam/ml/transforms/handlers.py: ########## @@ -83,20 +85,52 @@ tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]] +class DataCoder: + def __init__(self, exclude_columns=None): + """ + Uses PickleCoder to encode/decode the dictonaries. + Args: + exclude_columns: list of columns to exclude from the encoding. + """ + self.coder = coders.registry.get_coder(Any) + self.exclude_columns = exclude_columns + + def set_unused_columns(self, exclude_columns): + self.exclude_columns = exclude_columns + + def encode(self, element): + if not self.exclude_columns: Review Comment: yes, that is right but it errors because we are adding the temp id column name to the schema during construction so TFT errors out if the pcoll doesn't have the temp id column. So when the unused columns are none, we have to encode the empty dict and pass it to the PColl. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
