uranusjr commented on code in PR #34112:
URL: https://github.com/apache/airflow/pull/34112#discussion_r1316516583


##########
airflow/utils/sqlalchemy.py:
##########
@@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect):
         return super().load_dialect_impl(dialect)
 
 
-class ExtendedJSON(TypeDecorator):
+class JsonFieldMixin:
     """
-    A version of the JSON column that uses the Airflow extended JSON 
serialization.
+    A mixin TypeDecorator for a JSON column which help resolve types for 
different DB backends.
 
-    See airflow.serialization.
+    :param use_jsonb: Whether or not use JSONB type for Postgres backend. 
Default to False
     """
 
+    # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column 
%s Type not supported',
+    # SQLAlchemy will use ``load_dialect_impl`` anyway.
     impl = Text
 
-    cache_ok = True
+    def __init__(self, *args, use_jsonb: bool = False, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.use_jsonb = use_jsonb
 
     def load_dialect_impl(self, dialect) -> TypeEngine:
-        if dialect.name != "mssql":
-            return dialect.type_descriptor(JSON)
-        return dialect.type_descriptor(UnicodeText)
+        if dialect.name == "postgres" and self.use_jsonb:
+            return dialect.type_descriptor(postgresql.JSONB)
+        elif dialect.name == "mssql":
+            return dialect.type_descriptor(UnicodeText)
+        return dialect.type_descriptor(JSON)
+
+
+class JsonField(JsonFieldMixin, TypeDecorator):
+    """
+    A version of the JSON column that uses the default JSON serialization.
+
+    See airflow.serialization.
+    """

Review Comment:
   And this one should explain why we don’t use `sqlalchemy_jsonfield`.



##########
airflow/utils/sqlalchemy.py:
##########
@@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect):
         return super().load_dialect_impl(dialect)
 
 
-class ExtendedJSON(TypeDecorator):
+class JsonFieldMixin:
     """
-    A version of the JSON column that uses the Airflow extended JSON 
serialization.
+    A mixin TypeDecorator for a JSON column which help resolve types for 
different DB backends.
 
-    See airflow.serialization.
+    :param use_jsonb: Whether or not use JSONB type for Postgres backend. 
Default to False
     """
 
+    # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column 
%s Type not supported',
+    # SQLAlchemy will use ``load_dialect_impl`` anyway.
     impl = Text
 
-    cache_ok = True
+    def __init__(self, *args, use_jsonb: bool = False, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.use_jsonb = use_jsonb
 
     def load_dialect_impl(self, dialect) -> TypeEngine:
-        if dialect.name != "mssql":
-            return dialect.type_descriptor(JSON)
-        return dialect.type_descriptor(UnicodeText)
+        if dialect.name == "postgres" and self.use_jsonb:
+            return dialect.type_descriptor(postgresql.JSONB)
+        elif dialect.name == "mssql":
+            return dialect.type_descriptor(UnicodeText)
+        return dialect.type_descriptor(JSON)
+
+
+class JsonField(JsonFieldMixin, TypeDecorator):
+    """
+    A version of the JSON column that uses the default JSON serialization.
+
+    See airflow.serialization.
+    """

Review Comment:
   And this one should explain why we don’t use `sqlalchemy_jsonfield`.



##########
airflow/utils/sqlalchemy.py:
##########
@@ -105,21 +105,59 @@ def load_dialect_impl(self, dialect):
         return super().load_dialect_impl(dialect)
 
 
-class ExtendedJSON(TypeDecorator):
+class JsonFieldMixin:
     """
-    A version of the JSON column that uses the Airflow extended JSON 
serialization.
+    A mixin TypeDecorator for a JSON column which help resolve types for 
different DB backends.
 
-    See airflow.serialization.
+    :param use_jsonb: Whether or not use JSONB type for Postgres backend. 
Default to False
     """
 
+    # Set as ``TEXT`` make FAB happy and prevent to get any error like 'Column 
%s Type not supported',
+    # SQLAlchemy will use ``load_dialect_impl`` anyway.
     impl = Text
 
-    cache_ok = True
+    def __init__(self, *args, use_jsonb: bool = False, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.use_jsonb = use_jsonb
 
     def load_dialect_impl(self, dialect) -> TypeEngine:
-        if dialect.name != "mssql":
-            return dialect.type_descriptor(JSON)
-        return dialect.type_descriptor(UnicodeText)
+        if dialect.name == "postgres" and self.use_jsonb:
+            return dialect.type_descriptor(postgresql.JSONB)
+        elif dialect.name == "mssql":
+            return dialect.type_descriptor(UnicodeText)
+        return dialect.type_descriptor(JSON)
+
+
+class JsonField(JsonFieldMixin, TypeDecorator):
+    """
+    A version of the JSON column that uses the default JSON serialization.
+
+    See airflow.serialization.
+    """
+
+    cache_ok = True
+
+    def process_bind_param(self, value, dialect):
+        if value is None or dialect != "mssql":
+            return value
+        # If the database does not have native JSON support, encode it as a 
string.
+        return json.dumps(value)
+
+    def process_result_value(self, value, dialect):
+        if value is None or dialect != "mssql":
+            return value
+        # Deserialize from a string first if database does not have native 
JSON support.
+        return json.loads(value)
+
+
+class ExtendedJsonField(JsonFieldMixin, TypeDecorator):
+    """
+    A version of the JSON column that uses the Airflow extended JSON 
serialization.
+
+    See airflow.serialization.
+    """

Review Comment:
   This docstring should explain the difference between the two fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to