This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 766726f2e3 Fix `PostgresToGCSOperator` does not allow nested JSON
(#23063)
766726f2e3 is described below
commit 766726f2e3a282fcd2662f5dc6e9926dc38a6540
Author: pierrejeambrun <[email protected]>
AuthorDate: Mon May 9 00:06:23 2022 +0200
Fix `PostgresToGCSOperator` does not allow nested JSON (#23063)
* Avoid double json.dumps for json data export in PostgresToGCSOperator.
* Fix CI
---
.../google/cloud/transfers/mssql_to_gcs.py | 2 +-
.../google/cloud/transfers/mysql_to_gcs.py | 2 +-
.../google/cloud/transfers/oracle_to_gcs.py | 2 +-
.../google/cloud/transfers/postgres_to_gcs.py | 8 +++++--
.../google/cloud/transfers/presto_to_gcs.py | 2 +-
.../providers/google/cloud/transfers/sql_to_gcs.py | 16 ++++++++------
.../google/cloud/transfers/trino_to_gcs.py | 2 +-
.../google/cloud/transfers/test_postgres_to_gcs.py | 25 +++++++++++++++-------
.../google/cloud/transfers/test_sql_to_gcs.py | 2 +-
9 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
index 0bba2b558a..113b0713d1 100644
--- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
@@ -80,7 +80,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
}
@classmethod
- def convert_type(cls, value, schema_type):
+ def convert_type(cls, value, schema_type, **kwargs):
"""
Takes a value from MSSQL, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
index 57176a2826..2e72eaa774 100644
--- a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py
@@ -92,7 +92,7 @@ class MySQLToGCSOperator(BaseSQLToGCSOperator):
'mode': field_mode,
}
- def convert_type(self, value, schema_type: str):
+ def convert_type(self, value, schema_type: str, **kwargs):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
index bebe2a14b2..3306c98010 100644
--- a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py
@@ -85,7 +85,7 @@ class OracleToGCSOperator(BaseSQLToGCSOperator):
'mode': field_mode,
}
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, **kwargs):
"""
Takes a value from Oracle db, and converts it to a value that's safe
for
JSON/Google Cloud Storage/BigQuery.
diff --git a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
index ce77d7b87b..3f3012514d 100644
--- a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
@@ -128,13 +128,17 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else
'NULLABLE',
}
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, stringify_dict=True):
"""
Takes a value from Postgres, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Timezone aware Datetime are converted to UTC seconds.
Unaware Datetime, Date and Time are converted to ISO formatted strings.
Decimals are converted to floats.
+
+ :param value: Postgres column value.
+ :param schema_type: BigQuery data type.
+ :param stringify_dict: Specify whether to convert dict to string.
"""
if isinstance(value, datetime.datetime):
iso_format_value = value.isoformat()
@@ -149,7 +153,7 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min,
seconds=formatted_time.tm_sec
)
return str(time_delta)
- if isinstance(value, dict):
+ if stringify_dict and isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
return float(value)
diff --git a/airflow/providers/google/cloud/transfers/presto_to_gcs.py
b/airflow/providers/google/cloud/transfers/presto_to_gcs.py
index 19928220d2..1b2be5e091 100644
--- a/airflow/providers/google/cloud/transfers/presto_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/presto_to_gcs.py
@@ -195,7 +195,7 @@ class PrestoToGCSOperator(BaseSQLToGCSOperator):
return {"name": field[0], "type": new_field_type}
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Presto uses JSON on the transport layer, so types are
simple.
diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index bd9026a902..0df46bc777 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -150,9 +150,12 @@ class BaseSQLToGCSOperator(BaseOperator):
file_to_upload['file_handle'].close()
counter += 1
- def convert_types(self, schema, col_type_dict, row) -> list:
+ def convert_types(self, schema, col_type_dict, row, stringify_dict=False)
-> list:
"""Convert values from DBAPI to output-friendly formats."""
- return [self.convert_type(value, col_type_dict.get(name)) for name,
value in zip(schema, row)]
+ return [
+ self.convert_type(value, col_type_dict.get(name),
stringify_dict=stringify_dict)
+ for name, value in zip(schema, row)
+ ]
def _write_local_data_files(self, cursor):
"""
@@ -186,21 +189,20 @@ class BaseSQLToGCSOperator(BaseOperator):
parquet_writer = self._configure_parquet_file(tmp_file_handle,
parquet_schema)
for row in cursor:
- # Convert datetime objects to utc seconds, and decimals to floats.
- # Convert binary type object to string encoded with base64.
- row = self.convert_types(schema, col_type_dict, row)
-
if self.export_format == 'csv':
+ row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker
for value in row]
csv_writer.writerow(row)
elif self.export_format == 'parquet':
+ row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker
for value in row]
row_pydic = {col: [value] for col, value in zip(schema, row)}
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
else:
+ row = self.convert_types(schema, col_type_dict, row,
stringify_dict=False)
row_dict = dict(zip(schema, row))
tmp_file_handle.write(
@@ -273,7 +275,7 @@ class BaseSQLToGCSOperator(BaseOperator):
"""Convert a DBAPI field to BigQuery schema format."""
@abc.abstractmethod
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, **kwargs):
"""Convert a value from DBAPI to output-friendly formats."""
def _get_col_type_dict(self):
diff --git a/airflow/providers/google/cloud/transfers/trino_to_gcs.py
b/airflow/providers/google/cloud/transfers/trino_to_gcs.py
index a4635e8813..6d2e2e223b 100644
--- a/airflow/providers/google/cloud/transfers/trino_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/trino_to_gcs.py
@@ -195,7 +195,7 @@ class TrinoToGCSOperator(BaseSQLToGCSOperator):
return {"name": field[0], "type": new_field_type}
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Trino uses JSON on the transport layer, so types are
simple.
diff --git a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
index d8621d8dbc..ff653292c4 100644
--- a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
@@ -35,14 +35,15 @@ BUCKET = 'gs://test'
FILENAME = 'test_{}.ndjson'
NDJSON_LINES = [
- b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
- b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
- b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+ b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict":
{"a": null, "b": "something"}}, "some_num": 42, "some_str":
"mock_row_content_1"}\n', # noqa
+ b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
+ b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
]
SCHEMA_FILENAME = 'schema_test.json'
SCHEMA_JSON = (
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
- b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
+ b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
+ b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
)
@@ -55,16 +56,24 @@ class
TestPostgresToGoogleCloudStorageOperator(unittest.TestCase):
with conn.cursor() as cur:
for table in TABLES:
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
- cur.execute(f"CREATE TABLE {table}(some_str varchar,
some_num integer);")
+ cur.execute(f"CREATE TABLE {table}(some_str varchar,
some_num integer, some_json json);")
cur.execute(
- "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);",
('mock_row_content_1', 42)
+ "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+ (
+ 'mock_row_content_1',
+ 42,
+ '{"lastname": "Smith", "firtname": "John", \
+ "nested_dict": {"a": null, "b": "something"}}',
+ ),
)
cur.execute(
- "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);",
('mock_row_content_2', 43)
+ "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+ ('mock_row_content_2', 43, '{}'),
)
cur.execute(
- "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);",
('mock_row_content_3', 44)
+ "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
+ ('mock_row_content_3', 44, '{}'),
)
@classmethod
diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
index 668e8e48e4..525e04bd0e 100644
--- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
@@ -70,7 +70,7 @@ class DummySQLToGCSOperator(BaseSQLToGCSOperator):
'mode': 'NULLABLE',
}
- def convert_type(self, value, schema_type):
+ def convert_type(self, value, schema_type, stringify_dict):
return 'convert_type_return_value'
def query(self):