This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e396f06041 Pass SSL arg to all requests in DruidOperator (#39066)
e396f06041 is described below

commit e396f06041e51691ae09751c23ef57fd9a03df22
Author: Daniel Bell <70265071+danielb...@users.noreply.github.com>
AuthorDate: Sun May 5 08:19:50 2024 +0200

    Pass SSL arg to all requests in DruidOperator (#39066)
    
    * Pass SSL arg to all requests in DruidOperator
    
    * Remove unneeded test
    
    * Lint
    
    * Fix test
    
    * Fix tests
    
    * Add true test as per dirrao's comment
    
    * Use call_count == 1
    
    ---------
    
    Co-authored-by: Daniel Bell <daniel.b...@skyscanner.net>
---
 airflow/providers/apache/druid/hooks/druid.py    |  6 ++-
 tests/providers/apache/druid/hooks/test_druid.py | 60 ++++++++++++++++++++----
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/apache/druid/hooks/druid.py 
b/airflow/providers/apache/druid/hooks/druid.py
index da678d0153..a79b494f32 100644
--- a/airflow/providers/apache/druid/hooks/druid.py
+++ b/airflow/providers/apache/druid/hooks/druid.py
@@ -144,13 +144,15 @@ class DruidHook(BaseHook):
 
         sec = 0
         while running:
-            req_status = requests.get(druid_task_status_url, 
auth=self.get_auth())
+            req_status = requests.get(druid_task_status_url, 
auth=self.get_auth(), verify=self.get_verify())
 
             self.log.info("Job still running for %s seconds...", sec)
 
             if self.max_ingestion_time and sec > self.max_ingestion_time:
                 # ensure that the job gets killed if the max ingestion time is 
exceeded
-                requests.post(f"{url}/{druid_task_id}/shutdown", 
auth=self.get_auth())
+                requests.post(
+                    f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), 
verify=self.get_verify()
+                )
                 raise AirflowException(f"Druid ingestion took more than 
{self.max_ingestion_time} seconds")
 
             time.sleep(self.timeout)
diff --git a/tests/providers/apache/druid/hooks/test_druid.py 
b/tests/providers/apache/druid/hooks/test_druid.py
index 2b40264455..0d42695f06 100644
--- a/tests/providers/apache/druid/hooks/test_druid.py
+++ b/tests/providers/apache/druid/hooks/test_druid.py
@@ -99,25 +99,69 @@ class TestDruidSubmitHook:
         assert task_post.call_count == 1
         assert status_check.call_count == 1
 
-    def test_submit_with_correct_ssl_arg(self, requests_mock):
+    def test_submit_with_false_ssl_arg(self, requests_mock):
+        # Timeout so that all three requests are sent
+        self.db_hook.timeout = 1
+        self.db_hook.max_ingestion_time = 5
         self.db_hook.verify_ssl = False
+
         task_post = requests_mock.post(
             "http://druid-overlord:8081/druid/indexer/v1/task";,
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
         )
         status_check = requests_mock.get(
             
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status";,
-            text='{"status":{"status": "SUCCESS"}}',
+            text='{"status":{"status": "RUNNING"}}',
+        )
+        shutdown_post = requests_mock.post(
+            "http://druid-overlord:8081/druid/indexer/v1/task/";
+            "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
         )
 
-        self.db_hook.submit_indexing_job("Long json file")
+        with pytest.raises(AirflowException):
+            self.db_hook.submit_indexing_job("Long json file")
 
-        # PGH005: false positive on ``requests_mock`` argument `called_once`
         assert task_post.call_count == 1
-        assert status_check.call_count == 1
-        if task_post.called_once:
-            verify_ssl = task_post.request_history[0].verify
-            assert False is verify_ssl
+        assert False is task_post.request_history[0].verify
+
+        assert status_check.call_count > 1
+        assert False is status_check.request_history[0].verify
+
+        assert shutdown_post.call_count == 1
+        assert False is shutdown_post.request_history[0].verify
+
+    def test_submit_with_true_ssl_arg(self, requests_mock):
+        # Timeout so that all three requests are sent
+        self.db_hook.timeout = 1
+        self.db_hook.max_ingestion_time = 5
+        self.db_hook.verify_ssl = True
+
+        task_post = requests_mock.post(
+            "http://druid-overlord:8081/druid/indexer/v1/task";,
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
+        )
+        status_check = requests_mock.get(
+            
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status";,
+            text='{"status":{"status": "RUNNING"}}',
+        )
+        shutdown_post = requests_mock.post(
+            "http://druid-overlord:8081/druid/indexer/v1/task/";
+            "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
+        )
+
+        with pytest.raises(AirflowException):
+            self.db_hook.submit_indexing_job("Long json file")
+
+        assert task_post.call_count == 1
+        assert True is task_post.request_history[0].verify
+
+        assert status_check.call_count > 1
+        assert True is status_check.request_history[0].verify
+
+        assert shutdown_post.call_count == 1
+        assert True is shutdown_post.request_history[0].verify
 
     def test_submit_correct_json_body(self, requests_mock):
         task_post = requests_mock.post(

Reply via email to