uranusjr commented on code in PR #34112: URL: https://github.com/apache/airflow/pull/34112#discussion_r1316516583
########## airflow/utils/sqlalchemy.py: ########## @@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect): return super().load_dialect_impl(dialect) -class ExtendedJSON(TypeDecorator): +class JsonFieldMixin: """ - A version of the JSON column that uses the Airflow extended JSON serialization. + A mixin TypeDecorator for a JSON column which help resolve types for different DB backends. - See airflow.serialization. + :param use_jsonb: Whether or not use JSONB type for Postgres backend. Default to False """ + # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column %s Type not supported', + # SQLAlchemy will use ``load_dialect_impl`` anyway. impl = Text - cache_ok = True + def __init__(self, *args, use_jsonb: bool = False, **kwargs): + super().__init__(*args, **kwargs) + self.use_jsonb = use_jsonb def load_dialect_impl(self, dialect) -> TypeEngine: - if dialect.name != "mssql": - return dialect.type_descriptor(JSON) - return dialect.type_descriptor(UnicodeText) + if dialect.name == "postgres" and self.use_jsonb: + return dialect.type_descriptor(postgresql.JSONB) + elif dialect.name == "mssql": + return dialect.type_descriptor(UnicodeText) + return dialect.type_descriptor(JSON) + + +class JsonField(JsonFieldMixin, TypeDecorator): + """ + A version of the JSON column that uses the default JSON serialization. + + See airflow.serialization. + """ Review Comment: And this one should explain why we don’t use `sqlalchemy_jsonfield`. ########## airflow/utils/sqlalchemy.py: ########## @@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect): return super().load_dialect_impl(dialect) -class ExtendedJSON(TypeDecorator): +class JsonFieldMixin: """ - A version of the JSON column that uses the Airflow extended JSON serialization. + A mixin TypeDecorator for a JSON column which help resolve types for different DB backends. - See airflow.serialization. + :param use_jsonb: Whether or not use JSONB type for Postgres backend. Default to False """ + # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column %s Type not supported', + # SQLAlchemy will use ``load_dialect_impl`` anyway. impl = Text - cache_ok = True + def __init__(self, *args, use_jsonb: bool = False, **kwargs): + super().__init__(*args, **kwargs) + self.use_jsonb = use_jsonb def load_dialect_impl(self, dialect) -> TypeEngine: - if dialect.name != "mssql": - return dialect.type_descriptor(JSON) - return dialect.type_descriptor(UnicodeText) + if dialect.name == "postgres" and self.use_jsonb: + return dialect.type_descriptor(postgresql.JSONB) + elif dialect.name == "mssql": + return dialect.type_descriptor(UnicodeText) + return dialect.type_descriptor(JSON) + + +class JsonField(JsonFieldMixin, TypeDecorator): + """ + A version of the JSON column that uses the default JSON serialization. + + See airflow.serialization. + """ Review Comment: And this one should explain why we don’t use `sqlalchemy_jsonfield`. ########## airflow/utils/sqlalchemy.py: ########## @@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect): return super().load_dialect_impl(dialect) -class ExtendedJSON(TypeDecorator): +class JsonFieldMixin: """ - A version of the JSON column that uses the Airflow extended JSON serialization. + A mixin TypeDecorator for a JSON column which help resolve types for different DB backends. - See airflow.serialization. + :param use_jsonb: Whether or not use JSONB type for Postgres backend. Default to False """ + # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column %s Type not supported', + # SQLAlchemy will use ``load_dialect_impl`` anyway. impl = Text - cache_ok = True + def __init__(self, *args, use_jsonb: bool = False, **kwargs): + super().__init__(*args, **kwargs) + self.use_jsonb = use_jsonb def load_dialect_impl(self, dialect) -> TypeEngine: - if dialect.name != "mssql": - return dialect.type_descriptor(JSON) - return dialect.type_descriptor(UnicodeText) + if dialect.name == "postgres" and self.use_jsonb: + return dialect.type_descriptor(postgresql.JSONB) + elif dialect.name == "mssql": + return dialect.type_descriptor(UnicodeText) + return dialect.type_descriptor(JSON) + + +class JsonField(JsonFieldMixin, TypeDecorator): + """ + A version of the JSON column that uses the default JSON serialization. + + See airflow.serialization. + """ + + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None or dialect != "mssql": + return value + # If the database does not have native JSON support, encode it as a string. + return json.dumps(value) + + def process_result_value(self, value, dialect): + if value is None or dialect != "mssql": + return value + # Deserialize from a string first if database does not have native JSON support. + return json.loads(value) + + +class ExtendedJsonField(JsonFieldMixin, TypeDecorator): + """ + A version of the JSON column that uses the Airflow extended JSON serialization. + + See airflow.serialization. + """ Review Comment: This docstring should explain the difference between the two fields -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org