This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 553ddf3b992 Fix MetastoreHivePartitionSensor failing due to duplicate
aliases (#45001)
553ddf3b992 is described below
commit 553ddf3b992d4335aadbd6a8dd653f7575e61477
Author: Christian Yarros <[email protected]>
AuthorDate: Wed Dec 18 13:27:46 2024 +0000
Fix MetastoreHivePartitionSensor failing due to duplicate aliases (#45001)
* fix MetastoreHivePartitionSensor with updated query to avoid duplicate
aliases
* Update TestMetastoreHivePartitionSensor
* Fix MetastoreHivePartitionSensor pytests
---
.../google/cloud/hooks/dataproc_metastore.py | 63 ++++++++++++++--------
.../google/cloud/hooks/test_dataproc_metastore.py | 23 ++++----
2 files changed, 51 insertions(+), 35 deletions(-)
diff --git
a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
index b0168ddbfbb..35203979848 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -60,11 +60,16 @@ class DataprocMetastoreHook(GoogleBaseHook):
def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Wait for long-lasting operation to complete."""
+ self.log.info("Waiting for operation (timeout: %s seconds)", timeout)
+
try:
- return operation.result(timeout=timeout)
- except Exception:
+ result = operation.result(timeout=timeout)
+ self.log.info("Operation completed successfully")
+ return result
+ except Exception as e:
+ self.log.error("Operation failed: %s", str(e))
error = operation.exception(timeout=timeout)
- raise AirflowException(error)
+ raise AirflowException(f"Operation failed: {error}")
@GoogleBaseHook.fallback_to_default_project_id
def create_backup(
@@ -669,23 +674,37 @@ class DataprocMetastoreHook(GoogleBaseHook):
# because dictionaries are ordered since Python 3.7+
_partitions = list(dict.fromkeys(partition_names)) if partition_names
else []
- query = f"""
- SELECT *
- FROM PARTITIONS
- INNER JOIN TBLS
- ON PARTITIONS.TBL_ID = TBLS.TBL_ID
- WHERE
- TBLS.TBL_NAME = '{table}'"""
if _partitions:
- query += f"""
- AND PARTITIONS.PART_NAME IN ({', '.join(f"'{p}'" for p in
_partitions)})"""
- query += ";"
-
- client = self.get_dataproc_metastore_client_v1beta()
- result = client.query_metadata(
- request={
- "service":
f"projects/{project_id}/locations/{region}/services/{service_id}",
- "query": query,
- }
- )
- return result
+ partition_list = ", ".join(f"'{p}'" for p in _partitions)
+ query = f"""
+ SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+ FROM PARTITIONS
+ INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+ WHERE TBLS.TBL_NAME = '{table}'
+ AND PARTITIONS.PART_NAME IN ({partition_list});"""
+ else:
+ query = f"""
+ SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+ FROM PARTITIONS
+ INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+ WHERE TBLS.TBL_NAME = '{table}';"""
+
+ request = {
+ "service":
f"projects/{project_id}/locations/{region}/services/{service_id}",
+ "query": query,
+ }
+
+ self.log.info("Prepared request:")
+ self.log.info(request)
+
+ # Execute query
+ try:
+ self.log.info("Getting Dataproc Metastore client (v1beta)...")
+ client = self.get_dataproc_metastore_client_v1beta()
+ self.log.info("Executing query_metadata...")
+ result = client.query_metadata(request=request)
+ self.log.info("Query executed successfully")
+ return result
+ except Exception as e:
+ self.log.error("Error executing query_metadata: %s", str(e))
+ raise
diff --git a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
index 600693fa0f3..d31af293d47 100644
--- a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
+++ b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
@@ -60,20 +60,17 @@ TEST_TABLE_ID: str = "test_table"
TEST_PARTITION_NAME = "column=value"
TEST_SUBPARTITION_NAME = "column1=value1/column2=value2"
TEST_PARTITIONS_QUERY_ALL = """
- SELECT *
- FROM PARTITIONS
- INNER JOIN TBLS
- ON PARTITIONS.TBL_ID = TBLS.TBL_ID
- WHERE
- TBLS.TBL_NAME = '{}';"""
+ SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+ FROM PARTITIONS
+ INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+ WHERE TBLS.TBL_NAME = '{}';"""
+
TEST_PARTITIONS_QUERY = """
- SELECT *
- FROM PARTITIONS
- INNER JOIN TBLS
- ON PARTITIONS.TBL_ID = TBLS.TBL_ID
- WHERE
- TBLS.TBL_NAME = '{}'
- AND PARTITIONS.PART_NAME IN ({});"""
+ SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+ FROM PARTITIONS
+ INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+ WHERE TBLS.TBL_NAME = '{}'
+ AND PARTITIONS.PART_NAME IN ({});"""
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
DATAPROC_METASTORE_STRING =
"airflow.providers.google.cloud.hooks.dataproc_metastore.{}"