This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 553ddf3b992 Fix MetastoreHivePartitionSensor failing due to duplicate 
aliases (#45001)
553ddf3b992 is described below

commit 553ddf3b992d4335aadbd6a8dd653f7575e61477
Author: Christian Yarros <[email protected]>
AuthorDate: Wed Dec 18 13:27:46 2024 +0000

    Fix MetastoreHivePartitionSensor failing due to duplicate aliases (#45001)
    
    * fix MetastoreHivePartitionSensor with updated query to avoid duplicate 
aliases
    
    * Update TestMetastoreHivePartitionSensor
    
    * Fix MetastoreHivePartitionSensor pytests
---
 .../google/cloud/hooks/dataproc_metastore.py       | 63 ++++++++++++++--------
 .../google/cloud/hooks/test_dataproc_metastore.py  | 23 ++++----
 2 files changed, 51 insertions(+), 35 deletions(-)

diff --git 
a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py 
b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
index b0168ddbfbb..35203979848 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -60,11 +60,16 @@ class DataprocMetastoreHook(GoogleBaseHook):
 
     def wait_for_operation(self, timeout: float | None, operation: Operation):
         """Wait for long-lasting operation to complete."""
+        self.log.info("Waiting for operation (timeout: %s seconds)", timeout)
+
         try:
-            return operation.result(timeout=timeout)
-        except Exception:
+            result = operation.result(timeout=timeout)
+            self.log.info("Operation completed successfully")
+            return result
+        except Exception as e:
+            self.log.error("Operation failed: %s", str(e))
             error = operation.exception(timeout=timeout)
-            raise AirflowException(error)
+            raise AirflowException(f"Operation failed: {error}")
 
     @GoogleBaseHook.fallback_to_default_project_id
     def create_backup(
@@ -669,23 +674,37 @@ class DataprocMetastoreHook(GoogleBaseHook):
         # because dictionaries are ordered since Python 3.7+
         _partitions = list(dict.fromkeys(partition_names)) if partition_names 
else []
 
-        query = f"""
-                SELECT *
-                FROM PARTITIONS
-                INNER JOIN TBLS
-                ON PARTITIONS.TBL_ID = TBLS.TBL_ID
-                WHERE
-                    TBLS.TBL_NAME = '{table}'"""
         if _partitions:
-            query += f"""
-                    AND PARTITIONS.PART_NAME IN ({', '.join(f"'{p}'" for p in 
_partitions)})"""
-        query += ";"
-
-        client = self.get_dataproc_metastore_client_v1beta()
-        result = client.query_metadata(
-            request={
-                "service": 
f"projects/{project_id}/locations/{region}/services/{service_id}",
-                "query": query,
-            }
-        )
-        return result
+            partition_list = ", ".join(f"'{p}'" for p in _partitions)
+            query = f"""
+    SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+    FROM PARTITIONS
+    INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+    WHERE TBLS.TBL_NAME = '{table}'
+        AND PARTITIONS.PART_NAME IN ({partition_list});"""
+        else:
+            query = f"""
+    SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+    FROM PARTITIONS
+    INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+    WHERE TBLS.TBL_NAME = '{table}';"""
+
+        request = {
+            "service": 
f"projects/{project_id}/locations/{region}/services/{service_id}",
+            "query": query,
+        }
+
+        self.log.info("Prepared request:")
+        self.log.info(request)
+
+        # Execute query
+        try:
+            self.log.info("Getting Dataproc Metastore client (v1beta)...")
+            client = self.get_dataproc_metastore_client_v1beta()
+            self.log.info("Executing query_metadata...")
+            result = client.query_metadata(request=request)
+            self.log.info("Query executed successfully")
+            return result
+        except Exception as e:
+            self.log.error("Error executing query_metadata: %s", str(e))
+            raise
diff --git a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py 
b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
index 600693fa0f3..d31af293d47 100644
--- a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
+++ b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py
@@ -60,20 +60,17 @@ TEST_TABLE_ID: str = "test_table"
 TEST_PARTITION_NAME = "column=value"
 TEST_SUBPARTITION_NAME = "column1=value1/column2=value2"
 TEST_PARTITIONS_QUERY_ALL = """
-                SELECT *
-                FROM PARTITIONS
-                INNER JOIN TBLS
-                ON PARTITIONS.TBL_ID = TBLS.TBL_ID
-                WHERE
-                    TBLS.TBL_NAME = '{}';"""
+    SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+    FROM PARTITIONS
+    INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+    WHERE TBLS.TBL_NAME = '{}';"""
+
 TEST_PARTITIONS_QUERY = """
-                SELECT *
-                FROM PARTITIONS
-                INNER JOIN TBLS
-                ON PARTITIONS.TBL_ID = TBLS.TBL_ID
-                WHERE
-                    TBLS.TBL_NAME = '{}'
-                    AND PARTITIONS.PART_NAME IN ({});"""
+    SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
+    FROM PARTITIONS
+    INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
+    WHERE TBLS.TBL_NAME = '{}'
+        AND PARTITIONS.PART_NAME IN ({});"""
 BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
 DATAPROC_METASTORE_STRING = 
"airflow.providers.google.cloud.hooks.dataproc_metastore.{}"
 

Reply via email to