This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7272ab5e5b8 [Python] Add type_overrides parameter to BigQuery I/O for 
custom BigQuery-to-Python type mappings (#37253)
7272ab5e5b8 is described below

commit 7272ab5e5b83c6e19e387656dbc698cb7829e839
Author: Enzo Maruffa Moreira <[email protected]>
AuthorDate: Fri May 22 18:57:43 2026 -0300

    [Python] Add type_overrides parameter to BigQuery I/O for custom 
BigQuery-to-Python type mappings (#37253)
    
    * feat(bigquery): add type_overrides parameter for custom BigQuery to 
Python type mappings
    
    * docs(bigquery): fix Sphinx indentation in 
generate_user_type_from_bq_schema docstring
---
 CHANGES.md                                         |   1 +
 sdks/python/apache_beam/io/gcp/bigquery.py         |  25 +++-
 .../apache_beam/io/gcp/bigquery_schema_tools.py    |  63 ++++++--
 .../io/gcp/bigquery_schema_tools_test.py           | 109 ++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  13 +-
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 161 +++++++++++++++++++++
 6 files changed, 346 insertions(+), 26 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index fa4fdb09660..c87c1271280 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,7 @@
 * Added plugin mechanism to support different Lineage implementations (Java) 
([#36790](https://github.com/apache/beam/issues/36790)).
 * (Python) Supported Python user type in Beam SQL. For example, SQL statements 
like `SELECT some_field from PCOLLECTION` can now operate a PCollection of Beam 
Row containing pickable Python user type 
([#20738](https://github.com/apache/beam/issues/20738)).
 * (Python) Introduced `beam.coders.registry.register_row` as preferred API to 
register a named tuple or dataclass with a Beam Row. At pipelne runtime, the 
original type associated with the registered row are preserved across the 
serialization boundary ([#38108](https://github.com/apache/beam/issues/38108)).
+* (Python) Added `type_overrides` parameter to `WriteToBigQuery` allowing 
users to specify custom BigQuery to Python type mappings when using Storage 
Write API. This enables support for types like DATE, DATETIME, and JSON 
(Python) ([#25946](https://github.com/apache/beam/issues/25946)).
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index e1bb5583f38..d751d60c905 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2006,7 +2006,8 @@ class WriteToBigQuery(PTransform):
       use_cdc_writes: bool = False,
       primary_key: list[str] = None,
       expansion_service=None,
-      big_lake_configuration=None):
+      big_lake_configuration=None,
+      type_overrides=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -2183,6 +2184,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
         CREATE_IF_NEEDED mode for the underlying tables a list of column names
         is required to be configured as the primary key. Used for
         STORAGE_WRITE_API, working on 'at least once' mode.
+      type_overrides (dict): Optional mapping of BigQuery type names 
(uppercase)
+        to Python types. These override the default type mappings when
+        converting BigQuery schemas to Python types for STORAGE_WRITE_API.
+        For example: ``{'DATE': datetime.date, 'JSON': dict}``.
+        Default mappings include STRING->str, INT64->np.int64, etc.
     """
     self._table = table
     self._dataset = dataset
@@ -2228,6 +2234,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     self._use_cdc_writes = use_cdc_writes
     self._primary_key = primary_key
     self._big_lake_configuration = big_lake_configuration
+    self._type_overrides = type_overrides
 
   # Dict/schema methods were moved to bigquery_tools, but keep references
   # here for backward compatibility.
@@ -2392,7 +2399,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
           use_cdc_writes=self._use_cdc_writes,
           primary_key=self._primary_key,
           big_lake_configuration=self._big_lake_configuration,
-          expansion_service=self.expansion_service)
+          expansion_service=self.expansion_service,
+          type_overrides=self._type_overrides)
     else:
       raise ValueError(f"Unsupported method {method_to_use}")
 
@@ -2641,7 +2649,8 @@ class StorageWriteToBigQuery(PTransform):
       use_cdc_writes: bool = False,
       primary_key: list[str] = None,
       big_lake_configuration=None,
-      expansion_service=None):
+      expansion_service=None,
+      type_overrides=None):
     self._table = table
     self._table_side_inputs = table_side_inputs
     self._schema = schema
@@ -2655,6 +2664,7 @@ class StorageWriteToBigQuery(PTransform):
     self._use_cdc_writes = use_cdc_writes
     self._primary_key = primary_key
     self._big_lake_configuration = big_lake_configuration
+    self._type_overrides = type_overrides
     self._expansion_service = expansion_service or BeamJarExpansionService(
         'sdks:java:io:google-cloud-platform:expansion-service:build')
 
@@ -2688,7 +2698,7 @@ class StorageWriteToBigQuery(PTransform):
         input_beam_rows = (
             input
             | "Convert dict to Beam Row" >> self.ConvertToBeamRows(
-                schema, False).with_output_types())
+                schema, False, self._type_overrides).with_output_types())
 
     # For dynamic destinations, we first figure out where each row is going.
     # Then we send (destination, record) rows over to Java SchemaTransform.
@@ -2720,7 +2730,7 @@ class StorageWriteToBigQuery(PTransform):
         input_beam_rows = (
             input_rows
             | "Convert dict to Beam Row" >> self.ConvertToBeamRows(
-                schema, True).with_output_types())
+                schema, True, self._type_overrides).with_output_types())
       # communicate to Java that this write should use dynamic destinations
       table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS
 
@@ -2788,9 +2798,10 @@ class StorageWriteToBigQuery(PTransform):
       pass
 
   class ConvertToBeamRows(PTransform):
-    def __init__(self, schema, dynamic_destinations):
+    def __init__(self, schema, dynamic_destinations, type_overrides=None):
       self.schema = schema
       self.dynamic_destinations = dynamic_destinations
+      self.type_overrides = type_overrides
 
     def expand(self, input_dicts):
       if self.dynamic_destinations:
@@ -2816,7 +2827,7 @@ class StorageWriteToBigQuery(PTransform):
 
     def with_output_types(self):
       row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema(
-          self.schema)
+          self.schema, self.type_overrides)
       if self.dynamic_destinations:
         type_hint = RowTypeConstraint.from_fields([
             (StorageWriteToBigQuery.DESTINATION, str),
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
index 54c7ca90f01..d3d608b1fc6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
@@ -55,15 +55,23 @@ BIG_QUERY_TO_PYTHON_TYPES = {
 
 
 def generate_user_type_from_bq_schema(
-    the_table_schema, selected_fields: 'bigquery.TableSchema' = None) -> type:
+    the_table_schema,
+    selected_fields: 'bigquery.TableSchema' = None,
+    type_overrides=None) -> type:
   """Convert a schema of type TableSchema into a pcollection element.
-      Args:
-        the_table_schema: A BQ schema of type TableSchema
-        selected_fields: if not None, the subset of fields to consider
-      Returns:
-        type: type that can be used to work with pCollections.
-  """
 
+  Args:
+    the_table_schema: A BQ schema of type TableSchema
+    selected_fields: if not None, the subset of fields to consider
+    type_overrides: Optional mapping of BigQuery type names (uppercase)
+      to Python types. These override the default mappings in
+      BIG_QUERY_TO_PYTHON_TYPES. For example:
+      ``{'DATE': datetime.date, 'JSON': dict}``
+
+  Returns:
+    type: type that can be used to work with pCollections.
+  """
+  effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})}
   the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
       the_table_schema)
   if the_schema == {}:
@@ -72,8 +80,8 @@ def generate_user_type_from_bq_schema(
   for field in the_schema['fields']:
     if selected_fields is not None and field['name'] not in selected_fields:
       continue
-    if field['type'] in BIG_QUERY_TO_PYTHON_TYPES:
-      typ = bq_field_to_type(field['type'], field['mode'])
+    if field['type'] in effective_types:
+      typ = bq_field_to_type(field['type'], field['mode'], type_overrides)
     else:
       raise ValueError(
           f"Encountered "
@@ -85,19 +93,44 @@ def generate_user_type_from_bq_schema(
   return usertype
 
 
-def bq_field_to_type(field, mode):
+def bq_field_to_type(field, mode, type_overrides=None):
+  """Convert a BigQuery field type and mode to a Python type hint.
+
+  Args:
+    field: The BigQuery type name (e.g., 'STRING', 'DATE').
+    mode: The field mode ('NULLABLE', 'REPEATED', 'REQUIRED').
+    type_overrides: Optional mapping of BigQuery type names (uppercase)
+      to Python types. These override the default mappings.
+
+  Returns:
+    The corresponding Python type hint.
+  """
+  effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})}
   if mode == 'NULLABLE' or mode is None or mode == '':
-    return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+    return Optional[effective_types[field]]
   elif mode == 'REPEATED':
-    return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+    return Sequence[effective_types[field]]
   elif mode == 'REQUIRED':
-    return BIG_QUERY_TO_PYTHON_TYPES[field]
+    return effective_types[field]
   else:
     raise ValueError(f"Encountered an unsupported mode: {mode!r}")
 
 
-def convert_to_usertype(table_schema, selected_fields=None):
-  usertype = generate_user_type_from_bq_schema(table_schema, selected_fields)
+def convert_to_usertype(
+    table_schema, selected_fields=None, type_overrides=None):
+  """Convert a BigQuery table schema to a user type.
+
+  Args:
+    table_schema: A BQ schema of type TableSchema
+    selected_fields: if not None, the subset of fields to consider
+    type_overrides: Optional mapping of BigQuery type names (uppercase)
+      to Python types.
+
+  Returns:
+    A ParDo transform that converts dictionaries to the user type.
+  """
+  usertype = generate_user_type_from_bq_schema(
+      table_schema, selected_fields, type_overrides)
   return beam.ParDo(BeamSchemaConversionDoFn(usertype))
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
index 0eb3351ee84..3cf641a2fb0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
@@ -337,6 +337,115 @@ class TestBigQueryToSchema(unittest.TestCase):
     self.assertEqual(usertype.__annotations__, expected_annotations)
 
 
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestTypeOverridesSchemaTools(unittest.TestCase):
+  """Tests for type_overrides parameter in bigquery_schema_tools."""
+  def test_bq_field_to_type_with_overrides(self):
+    """Test bq_field_to_type function with type_overrides."""
+    import datetime
+
+    from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
+
+    # Without overrides, DATE is not supported
+    with self.assertRaises(KeyError):
+      bq_field_to_type("DATE", "REQUIRED")
+
+    # With overrides, DATE works
+    overrides = {"DATE": datetime.date}
+    self.assertEqual(
+        bq_field_to_type("DATE", "REQUIRED", overrides), datetime.date)
+    self.assertEqual(
+        bq_field_to_type("DATE", "NULLABLE", overrides),
+        typing.Optional[datetime.date])
+    self.assertEqual(
+        bq_field_to_type("DATE", "REPEATED", overrides),
+        typing.Sequence[datetime.date])
+
+  def test_bq_field_to_type_overrides_can_use_str(self):
+    """Test that type_overrides can map DATE/DATETIME/JSON to str."""
+    from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
+
+    overrides = {"DATE": str, "DATETIME": str, "JSON": str}
+    self.assertEqual(bq_field_to_type("DATE", "REQUIRED", overrides), str)
+    self.assertEqual(bq_field_to_type("DATETIME", "REQUIRED", overrides), str)
+    self.assertEqual(bq_field_to_type("JSON", "REQUIRED", overrides), str)
+
+  def test_generate_user_type_with_overrides(self):
+    """Test generate_user_type_from_bq_schema with type_overrides."""
+    import datetime
+
+    schema = bigquery.TableSchema(
+        fields=[
+            bigquery.TableFieldSchema(
+                name='id', type='INTEGER', mode="REQUIRED"),
+            bigquery.TableFieldSchema(
+                name='event_date', type='DATE', mode="NULLABLE")
+        ])
+
+    # Without overrides, DATE is not supported
+    with self.assertRaises(ValueError):
+      bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
+
+    # With overrides, DATE works
+    overrides = {"DATE": datetime.date}
+    usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
+        schema, type_overrides=overrides)
+    self.assertEqual(
+        usertype.__annotations__, {
+            'id': np.int64, 'event_date': typing.Optional[datetime.date]
+        })
+
+  def test_generate_user_type_overrides_with_str(self):
+    """Test that type_overrides can map DATE to str."""
+    schema = bigquery.TableSchema(
+        fields=[
+            bigquery.TableFieldSchema(
+                name='id', type='INTEGER', mode="REQUIRED"),
+            bigquery.TableFieldSchema(
+                name='event_date', type='DATE', mode="NULLABLE")
+        ])
+
+    overrides = {"DATE": str}
+    usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
+        schema, type_overrides=overrides)
+    self.assertEqual(
+        usertype.__annotations__, {
+            'id': np.int64, 'event_date': typing.Optional[str]
+        })
+
+  def test_convert_to_usertype_with_overrides(self):
+    """Test convert_to_usertype function with type_overrides."""
+    import datetime
+
+    schema = bigquery.TableSchema(
+        fields=[
+            bigquery.TableFieldSchema(
+                name='id', type='INTEGER', mode="REQUIRED"),
+            bigquery.TableFieldSchema(
+                name='event_date', type='DATE', mode="NULLABLE")
+        ])
+
+    overrides = {"DATE": datetime.date}
+    transform = bigquery_schema_tools.convert_to_usertype(
+        schema, type_overrides=overrides)
+
+    # The transform should be created successfully
+    self.assertIsNotNone(transform)
+    self.assertIsInstance(transform, beam.ParDo)
+
+  def test_type_overrides_can_override_default_types(self):
+    """Test that type_overrides can override default type mappings."""
+    from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
+
+    # GEOGRAPHY is in the default mapping as str
+    self.assertEqual(bq_field_to_type("GEOGRAPHY", "REQUIRED"), str)
+
+    # We can override it
+    overrides = {"GEOGRAPHY": bytes}
+    self.assertEqual(
+        bq_field_to_type("GEOGRAPHY", "REQUIRED", overrides), bytes)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index a16d1000304..8dd58cd55a0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -1781,18 +1781,23 @@ bigquery_v2_messages.TableSchema):
       "root", dict_table_schema)
 
 
-def get_beam_typehints_from_tableschema(schema):
+def get_beam_typehints_from_tableschema(schema, type_overrides=None):
   """Extracts Beam Python type hints from the schema.
 
   Args:
     schema (~apache_beam.io.gcp.internal.clients.bigquery.\
 bigquery_v2_messages.TableSchema):
       The TableSchema to extract type hints from.
+    type_overrides (dict): Optional mapping of BigQuery type names (uppercase)
+      to Python types. These override the default mappings in
+      BIGQUERY_TYPE_TO_PYTHON_TYPE. For example:
+      ``{'DATE': datetime.date, 'JSON': dict}``
 
   Returns:
     List[Tuple[str, Any]]: A list of type hints that describe the input schema.
     Nested and repeated fields are supported.
   """
+  effective_types = {**BIGQUERY_TYPE_TO_PYTHON_TYPE, **(type_overrides or {})}
   if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)):
     schema = get_bq_tableschema(schema)
   typehints = []
@@ -1802,9 +1807,9 @@ bigquery_v2_messages.TableSchema):
     if field_type in ["STRUCT", "RECORD"]:
       # Structs can be represented as Beam Rows.
       typehint = RowTypeConstraint.from_fields(
-          get_beam_typehints_from_tableschema(field))
-    elif field_type in BIGQUERY_TYPE_TO_PYTHON_TYPE:
-      typehint = BIGQUERY_TYPE_TO_PYTHON_TYPE[field_type]
+          get_beam_typehints_from_tableschema(field, type_overrides))
+    elif field_type in effective_types:
+      typehint = effective_types[field_type]
     else:
       raise ValueError(
           f"Converting BigQuery type [{field_type}] to "
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 2594e6728e0..078c4216094 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -1248,6 +1248,167 @@ class TestGeographyTypeSupport(unittest.TestCase):
     self.assertIsInstance(result, str)
 
 
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestTypeOverrides(unittest.TestCase):
+  """Tests for type_overrides parameter in BigQuery type mappings."""
+  def test_type_overrides_enables_unsupported_types(self):
+    """Test that type_overrides enables support for DATE/DATETIME/JSON."""
+    import datetime
+    schema = {
+        "fields": [{
+            "name": "date_field", "type": "DATE", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "datetime_field",
+                       "type": "DATETIME",
+                       "mode": "REQUIRED"
+                   }, {
+                       "name": "json_field", "type": "JSON", "mode": "REQUIRED"
+                   }]
+    }
+
+    # Without overrides, these types are not supported
+    with self.assertRaises(ValueError):
+      get_beam_typehints_from_tableschema(schema)
+
+    # With overrides, they work
+    type_overrides = {"DATE": str, "DATETIME": str, "JSON": str}
+    typehints = get_beam_typehints_from_tableschema(schema, type_overrides)
+    self.assertEqual(
+        typehints, [("date_field", str), ("datetime_field", str),
+                    ("json_field", str)])
+
+  def test_type_overrides_with_custom_types(self):
+    """Test type_overrides with custom Python types."""
+    import datetime
+    schema = {
+        "fields": [{
+            "name": "date_field", "type": "DATE", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "datetime_field",
+                       "type": "DATETIME",
+                       "mode": "REQUIRED"
+                   }]
+    }
+
+    type_overrides = {"DATE": datetime.date, "DATETIME": datetime.datetime}
+    typehints = get_beam_typehints_from_tableschema(schema, type_overrides)
+    self.assertEqual(
+        typehints, [("date_field", datetime.date),
+                    ("datetime_field", datetime.datetime)])
+
+  def test_type_overrides_with_modes(self):
+    """Test that type_overrides works with NULLABLE and REPEATED modes."""
+    import datetime
+    schema = {
+        "fields": [{
+            "name": "required_date", "type": "DATE", "mode": "REQUIRED"
+        }, {
+            "name": "optional_date", "type": "DATE", "mode": "NULLABLE"
+        }, {
+            "name": "repeated_dates", "type": "DATE", "mode": "REPEATED"
+        }]
+    }
+
+    type_overrides = {"DATE": datetime.date}
+    typehints = get_beam_typehints_from_tableschema(schema, type_overrides)
+
+    expected = [("required_date", datetime.date),
+                ("optional_date", Optional[datetime.date]),
+                ("repeated_dates", Sequence[datetime.date])]
+    self.assertEqual(typehints, expected)
+
+  def test_type_overrides_mixed_with_default_types(self):
+    """Test type_overrides alongside default type mappings."""
+    import datetime
+    schema = {
+        "fields": [{
+            "name": "date_field", "type": "DATE", "mode": "REQUIRED"
+        }, {
+            "name": "string_field", "type": "STRING", "mode": "REQUIRED"
+        }, {
+            "name": "int_field", "type": "INTEGER", "mode": "REQUIRED"
+        }]
+    }
+
+    type_overrides = {"DATE": datetime.date}
+    typehints = get_beam_typehints_from_tableschema(schema, type_overrides)
+
+    expected = [("date_field", datetime.date), ("string_field", str),
+                ("int_field", np.int64)]
+    self.assertEqual(typehints, expected)
+
+  def test_type_overrides_with_nested_struct(self):
+    """Test that type_overrides is propagated to nested STRUCT fields."""
+    import datetime
+    schema = bigquery.TableSchema()
+
+    # Root field
+    date_field = bigquery.TableFieldSchema()
+    date_field.name = "date_field"
+    date_field.type = "DATE"
+    date_field.mode = "REQUIRED"
+
+    # Nested struct with DATE field
+    struct_field = bigquery.TableFieldSchema()
+    struct_field.name = "nested"
+    struct_field.type = "RECORD"
+    struct_field.mode = "REQUIRED"
+
+    nested_date = bigquery.TableFieldSchema()
+    nested_date.name = "nested_date"
+    nested_date.type = "DATE"
+    nested_date.mode = "REQUIRED"
+    struct_field.fields.append(nested_date)
+
+    schema.fields.append(date_field)
+    schema.fields.append(struct_field)
+
+    type_overrides = {"DATE": datetime.date}
+    typehints = get_beam_typehints_from_tableschema(schema, type_overrides)
+
+    self.assertEqual(len(typehints), 2)
+    self.assertEqual(typehints[0], ("date_field", datetime.date))
+    # The nested field's DATE should also be overridden
+    nested_constraint = typehints[1][1]
+    nested_fields = nested_constraint._fields
+    self.assertEqual(nested_fields[0], ("nested_date", datetime.date))
+
+  def test_type_overrides_can_override_default_types(self):
+    """Test that type_overrides can override default type mappings."""
+    schema = {
+        "fields": [{
+            "name": "geo_field", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        }]
+    }
+
+    # Without overrides, GEOGRAPHY maps to str (default)
+    typehints = get_beam_typehints_from_tableschema(schema, None)
+    self.assertEqual(typehints, [("geo_field", str)])
+
+    # With overrides, we can change it
+    typehints_override = get_beam_typehints_from_tableschema(
+        schema, {"GEOGRAPHY": bytes})
+    self.assertEqual(typehints_override, [("geo_field", bytes)])
+
+  def test_type_overrides_json_to_dict(self):
+    """Test using type_overrides to map JSON to dict."""
+    schema = {"fields": [{"name": "data", "type": "JSON", "mode": "NULLABLE"}]}
+
+    # Without overrides, JSON is not supported
+    with self.assertRaises(ValueError):
+      get_beam_typehints_from_tableschema(schema)
+
+    # With overrides, can map to str
+    typehints_str = get_beam_typehints_from_tableschema(schema, {"JSON": str})
+    self.assertEqual(typehints_str, [("data", Optional[str])])
+
+    # Or map to dict
+    typehints_dict = get_beam_typehints_from_tableschema(schema, {"JSON": 
dict})
+    self.assertEqual(typehints_dict, [("data", Optional[dict])])
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to