This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f575da4a80d Fix KafkaError.name() called as property instead of method 
in create_topic (#65734)
f575da4a80d is described below

commit f575da4a80dfc1bdd6a421f3421f550f79020c9a
Author: Park Jiwon <[email protected]>
AuthorDate: Fri May 15 02:58:42 2026 +0900

    Fix KafkaError.name() called as property instead of method in create_topic 
(#65734)
---
 .../kafka/src/airflow/providers/apache/kafka/hooks/client.py  |  2 +-
 .../apache/kafka/tests/unit/apache/kafka/hooks/test_client.py | 11 +++++------
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
index 8df77604737..b26383da5e1 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
@@ -56,7 +56,7 @@ class KafkaAdminClientHook(KafkaBaseHook):
                 f.result()
                 self.log.info("The topic %s has been created.", t)
             except KafkaException as e:
-                if e.args[0].name == "TOPIC_ALREADY_EXISTS":
+                if e.args[0].name() == "TOPIC_ALREADY_EXISTS":
                     self.log.warning("The topic %s already exists.", t)
                 else:
                     raise
diff --git 
a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py 
b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
index d56bc17b207..5e3ea7189a6 100644
--- a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
@@ -85,19 +85,18 @@ class TestKafkaAdminClientHook:
     @patch(
         "airflow.providers.apache.kafka.hooks.base.AdminClient",
     )
-    def test_create_topic_warning(self, admin_client, caplog):
+    def test_create_topic_already_exists_no_exception_but_warning(self, 
admin_client):
         mock_f = MagicMock()
         kafka_exception = KafkaException()
         mock_arg = MagicMock()
-        mock_arg.name = "TOPIC_ALREADY_EXISTS"
+        mock_arg.name.return_value = "TOPIC_ALREADY_EXISTS"
         kafka_exception.args = [mock_arg]
         mock_f.result.side_effect = [kafka_exception]
         admin_client.return_value.create_topics.return_value = {"topic_name": 
mock_f}
-        with caplog.at_level(
-            logging.WARNING, 
logger="airflow.providers.apache.kafka.hooks.client.KafkaAdminClientHook"
-        ):
+
+        with patch.object(self.hook.log, "warning") as mock_warning:
             self.hook.create_topic(topics=[("topic_name", 0, 1)])
-            assert "The topic topic_name already exists" in caplog.text
+            mock_warning.assert_called_once_with("The topic %s already 
exists.", "topic_name")
 
     @patch(
         "airflow.providers.apache.kafka.hooks.base.AdminClient",

Reply via email to