nailo2c opened a new pull request, #52926:
URL: https://github.com/apache/airflow/pull/52926

   Closes: #35012
   
   # Why
   
   1. Port `ssl_verify_cert` from `pydruid` to `DruidDbApiHook.get_conn`.
   2. `get_pandas_df` was implemented in  #35494.
   
   # How
   
   ```python
   druid_broker_conn = connect(
       ...
       ssl_verify_cert=conn.extra_dejson.get("ssl_verify_cert", True),
   )
   ```
   
   # What
   
   ## ssl_verify_cert
   
   Create connections with following commands:
   
   ```console
   airflow connections add 'druid_ssl_true' \
     --conn-type 'druid' \
     --conn-host 'broker' \
     --conn-port '8082' \
     --conn-schema 'https' \
     --conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": true}'
   
   airflow connections add 'druid_ssl_false' \
     --conn-type 'druid' \
     --conn-host 'broker' \
     --conn-port '8082' \
     --conn-schema 'https' \
     --conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": false}'
   ```
   
   Create a DAG to test `ssl_verify_cert`:
   ```python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
   from datetime import datetime
   from airflow.exceptions import AirflowException
   import logging
   
   def test_druid_ssl_verify(conn_id):
       hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
       try:
           conn = hook.get_conn()
           logging.info('=== conn.ssl_verify_cert ===')
           logging.info(conn.ssl_verify_cert)
           cur = conn.cursor()
           cur.execute("SELECT 1")
           result = cur.fetchall()
           logging.info("Druid query result: %s", result)
       except Exception as e:
           logging.error("Connection failed: %s", str(e))
           raise AirflowException(f"SSL test failed for connection {conn_id}: 
{str(e)}")
       finally:
           conn.close()
   
   with DAG(
       dag_id="test_druid_ssl_verify",
       start_date=datetime(2025, 7, 1),
       schedule=None,
       catchup=False,
       tags=["druid", "ssl-test"],
   ) as dag:
   
       test_conn_ssl_true = PythonOperator(
           task_id="test_ssl_verify_true",
           python_callable=test_druid_ssl_verify,
           op_args=["druid_ssl_true"],
       )
   
       test_conn_ssl_false = PythonOperator(
           task_id="test_ssl_verify_false",
           python_callable=test_druid_ssl_verify,
           op_args=["druid_ssl_false"],
       )
   
       test_conn_ssl_true >> test_conn_ssl_false
   ```
   
   Result - matches expectation:
   + `ssl_verify_cert=true`
   <img width="1905" alt="druid_ssl_true" 
src="https://github.com/user-attachments/assets/faad3e75-b494-4a98-b6a4-2770108cd27d";
 />
   
   + `ssl_verify_cert=false`
   <img width="1904" alt="druid_ssl_false" 
src="https://github.com/user-attachments/assets/0dd1d1e7-71a6-405c-ade0-46136e8ef9fd";
 />
   
   ## get_pandas_df
   
   Example DAG:
   ```python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
   from datetime import datetime
   import logging
   
   def test_druid_get_pandas_df(conn_id: str):
       hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
       try:
           df = hook.get_pandas_df("SELECT 1")
           logging.info("Druid returned dataframe:\n%s", df)
       except Exception as e:
           logging.error("Druid get_pandas_df failed: %s", e)
           raise
   
   with DAG(
       dag_id="test_druid_get_pandas_df",
       start_date=datetime(2025, 7, 1),
       schedule=None,
       catchup=False,
       tags=["druid", "ssl", "get_pandas_df"],
   ) as dag:
   
       run_query = PythonOperator(
           task_id="run_druid_query",
           python_callable=test_druid_get_pandas_df,
           op_args=["druid_ssl_false"],
       )
   ```
   
   Result:
   <img width="1909" alt="druid_pd_df" 
src="https://github.com/user-attachments/assets/8b96bce7-ad28-4816-a129-1e987f9153a1";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to