This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 766726f2e3 Fix `PostgresToGCSOperator` does not allow nested JSON 
(#23063)
766726f2e3 is described below

commit 766726f2e3a282fcd2662f5dc6e9926dc38a6540
Author: pierrejeambrun <[email protected]>
AuthorDate: Mon May 9 00:06:23 2022 +0200

    Fix `PostgresToGCSOperator` does not allow nested JSON (#23063)
    
    * Avoid double json.dumps for json data export in PostgresToGCSOperator.
    
    * Fix CI
---
 .../google/cloud/transfers/mssql_to_gcs.py         |  2 +-
 .../google/cloud/transfers/mysql_to_gcs.py         |  2 +-
 .../google/cloud/transfers/oracle_to_gcs.py        |  2 +-
 .../google/cloud/transfers/postgres_to_gcs.py      |  8 +++++--
 .../google/cloud/transfers/presto_to_gcs.py        |  2 +-
 .../providers/google/cloud/transfers/sql_to_gcs.py | 16 ++++++++------
 .../google/cloud/transfers/trino_to_gcs.py         |  2 +-
 .../google/cloud/transfers/test_postgres_to_gcs.py | 25 +++++++++++++++-------
 .../google/cloud/transfers/test_sql_to_gcs.py      |  2 +-
 9 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py 
b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
index 0bba2b558a..113b0713d1 100644
--- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
@@ -80,7 +80,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
         }
 
     @classmethod
-    def convert_type(cls, value, schema_type):
+    def convert_type(cls, value, schema_type, **kwargs):
         """
         Takes a value from MSSQL, and converts it to a value that's safe for
         JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py 
b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
index 57176a2826..2e72eaa774 100644
--- a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
@@ -92,7 +92,7 @@ class MySQLToGCSOperator(BaseSQLToGCSOperator):
             'mode': field_mode,
         }
 
-    def convert_type(self, value, schema_type: str):
+    def convert_type(self, value, schema_type: str, **kwargs):
         """
         Takes a value from MySQLdb, and converts it to a value that's safe for
         JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py 
b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
index bebe2a14b2..3306c98010 100644
--- a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
@@ -85,7 +85,7 @@ class OracleToGCSOperator(BaseSQLToGCSOperator):
             'mode': field_mode,
         }
 
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, **kwargs):
         """
         Takes a value from Oracle db, and converts it to a value that's safe 
for
         JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py 
b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
index ce77d7b87b..3f3012514d 100644
--- a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
@@ -128,13 +128,17 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
             'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else 
'NULLABLE',
         }
 
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, stringify_dict=True):
         """
         Takes a value from Postgres, and converts it to a value that's safe for
         JSON/Google Cloud Storage/BigQuery.
         Timezone aware Datetime are converted to UTC seconds.
         Unaware Datetime, Date and Time are converted to ISO formatted strings.
         Decimals are converted to floats.
+
+        :param value: Postgres column value.
+        :param schema_type: BigQuery data type.
+        :param stringify_dict: Specify whether to convert dict to string.
         """
         if isinstance(value, datetime.datetime):
             iso_format_value = value.isoformat()
@@ -149,7 +153,7 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
                 hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, 
seconds=formatted_time.tm_sec
             )
             return str(time_delta)
-        if isinstance(value, dict):
+        if stringify_dict and isinstance(value, dict):
             return json.dumps(value)
         if isinstance(value, Decimal):
             return float(value)
diff --git a/airflow/providers/google/cloud/transfers/presto_to_gcs.py 
b/airflow/providers/google/cloud/transfers/presto_to_gcs.py
index 19928220d2..1b2be5e091 100644
--- a/airflow/providers/google/cloud/transfers/presto_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/presto_to_gcs.py
@@ -195,7 +195,7 @@ class PrestoToGCSOperator(BaseSQLToGCSOperator):
 
         return {"name": field[0], "type": new_field_type}
 
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, **kwargs):
         """
         Do nothing. Presto uses JSON on the transport layer, so types are 
simple.
 
diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py 
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index bd9026a902..0df46bc777 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -150,9 +150,12 @@ class BaseSQLToGCSOperator(BaseOperator):
             file_to_upload['file_handle'].close()
             counter += 1
 
-    def convert_types(self, schema, col_type_dict, row) -> list:
+    def convert_types(self, schema, col_type_dict, row, stringify_dict=False) 
-> list:
         """Convert values from DBAPI to output-friendly formats."""
-        return [self.convert_type(value, col_type_dict.get(name)) for name, 
value in zip(schema, row)]
+        return [
+            self.convert_type(value, col_type_dict.get(name), 
stringify_dict=stringify_dict)
+            for name, value in zip(schema, row)
+        ]
 
     def _write_local_data_files(self, cursor):
         """
@@ -186,21 +189,20 @@ class BaseSQLToGCSOperator(BaseOperator):
             parquet_writer = self._configure_parquet_file(tmp_file_handle, 
parquet_schema)
 
         for row in cursor:
-            # Convert datetime objects to utc seconds, and decimals to floats.
-            # Convert binary type object to string encoded with base64.
-            row = self.convert_types(schema, col_type_dict, row)
-
             if self.export_format == 'csv':
+                row = self.convert_types(schema, col_type_dict, row)
                 if self.null_marker is not None:
                     row = [value if value is not None else self.null_marker 
for value in row]
                 csv_writer.writerow(row)
             elif self.export_format == 'parquet':
+                row = self.convert_types(schema, col_type_dict, row)
                 if self.null_marker is not None:
                     row = [value if value is not None else self.null_marker 
for value in row]
                 row_pydic = {col: [value] for col, value in zip(schema, row)}
                 tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
                 parquet_writer.write_table(tbl)
             else:
+                row = self.convert_types(schema, col_type_dict, row, 
stringify_dict=False)
                 row_dict = dict(zip(schema, row))
 
                 tmp_file_handle.write(
@@ -273,7 +275,7 @@ class BaseSQLToGCSOperator(BaseOperator):
         """Convert a DBAPI field to BigQuery schema format."""
 
     @abc.abstractmethod
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, **kwargs):
         """Convert a value from DBAPI to output-friendly formats."""
 
     def _get_col_type_dict(self):
diff --git a/airflow/providers/google/cloud/transfers/trino_to_gcs.py 
b/airflow/providers/google/cloud/transfers/trino_to_gcs.py
index a4635e8813..6d2e2e223b 100644
--- a/airflow/providers/google/cloud/transfers/trino_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/trino_to_gcs.py
@@ -195,7 +195,7 @@ class TrinoToGCSOperator(BaseSQLToGCSOperator):
 
         return {"name": field[0], "type": new_field_type}
 
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, **kwargs):
         """
         Do nothing. Trino uses JSON on the transport layer, so types are 
simple.
 
diff --git a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
index d8621d8dbc..ff653292c4 100644
--- a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
@@ -35,14 +35,15 @@ BUCKET = 'gs://test'
 FILENAME = 'test_{}.ndjson'
 
 NDJSON_LINES = [
-    b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
-    b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
-    b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+    b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict": 
{"a": null, "b": "something"}}, "some_num": 42, "some_str": 
"mock_row_content_1"}\n',  # noqa
+    b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
+    b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
 ]
 SCHEMA_FILENAME = 'schema_test.json'
 SCHEMA_JSON = (
     b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
-    b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
+    b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
+    b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
 )
 
 
@@ -55,16 +56,24 @@ class 
TestPostgresToGoogleCloudStorageOperator(unittest.TestCase):
             with conn.cursor() as cur:
                 for table in TABLES:
                     cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
-                    cur.execute(f"CREATE TABLE {table}(some_str varchar, 
some_num integer);")
+                    cur.execute(f"CREATE TABLE {table}(some_str varchar, 
some_num integer, some_json json);")
 
                 cur.execute(
-                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", 
('mock_row_content_1', 42)
+                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+                    (
+                        'mock_row_content_1',
+                        42,
+                        '{"lastname": "Smith", "firtname": "John", \
+                          "nested_dict": {"a": null, "b": "something"}}',
+                    ),
                 )
                 cur.execute(
-                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", 
('mock_row_content_2', 43)
+                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+                    ('mock_row_content_2', 43, '{}'),
                 )
                 cur.execute(
-                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", 
('mock_row_content_3', 44)
+                    "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+                    ('mock_row_content_3', 44, '{}'),
                 )
 
     @classmethod
diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
index 668e8e48e4..525e04bd0e 100644
--- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
@@ -70,7 +70,7 @@ class DummySQLToGCSOperator(BaseSQLToGCSOperator):
             'mode': 'NULLABLE',
         }
 
-    def convert_type(self, value, schema_type):
+    def convert_type(self, value, schema_type, stringify_dict):
         return 'convert_type_return_value'
 
     def query(self):

Reply via email to