This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b5a4aac6b Sanitize keyspace and table in the cassandra query to avoid 
sql injection (#36111)
6b5a4aac6b is described below

commit 6b5a4aac6bc8fcb4a9af8f07020f5093a643efcc
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Dec 16 20:29:28 2023 +0100

    Sanitize keyspace and table in the cassandra query to avoid sql injection 
(#36111)
    
    * Use query params to define cassandra keyspace and table in the query
    
    * Follow the same pattern used for keys %(...)s
    
    * Switch to different approach
---
 airflow/providers/apache/cassandra/hooks/cassandra.py   | 14 ++++++++++++--
 .../providers/apache/cassandra/hooks/test_cassandra.py  | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py 
b/airflow/providers/apache/cassandra/hooks/cassandra.py
index 999782e0da..66d3d13aa8 100644
--- a/airflow/providers/apache/cassandra/hooks/cassandra.py
+++ b/airflow/providers/apache/cassandra/hooks/cassandra.py
@@ -18,6 +18,7 @@
 """This module contains hook to integrate with Apache Cassandra."""
 from __future__ import annotations
 
+import re
 from typing import Any, Union
 
 from cassandra.auth import PlainTextAuthProvider
@@ -188,6 +189,13 @@ class CassandraHook(BaseHook, LoggingMixin):
         cluster_metadata = self.get_conn().cluster.metadata
         return keyspace in cluster_metadata.keyspaces and table in 
cluster_metadata.keyspaces[keyspace].tables
 
+    @staticmethod
+    def _sanitize_input(input_string: str) -> str:
+        if re.match(r"^\w+$", input_string):
+            return input_string
+        else:
+            raise ValueError(f"Invalid input: {input_string}")
+
     def record_exists(self, table: str, keys: dict[str, str]) -> bool:
         """
         Check if a record exists in Cassandra.
@@ -196,9 +204,11 @@ class CassandraHook(BaseHook, LoggingMixin):
                       Use dot notation to target a specific keyspace.
         :param keys: The keys and their values to check the existence.
         """
-        keyspace = self.keyspace
+        keyspace = self._sanitize_input(self.keyspace) if self.keyspace else 
self.keyspace
         if "." in table:
-            keyspace, table = table.split(".", 1)
+            keyspace, table = map(self._sanitize_input, table.split(".", 1))
+        else:
+            table = self._sanitize_input(table)
         ks_str = " AND ".join(f"{key}=%({key})s" for key in keys)
         query = f"SELECT * FROM {keyspace}.{table} WHERE {ks_str}"
         try:
diff --git 
a/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py 
b/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
index c100e55bdb..c27b72d369 100644
--- a/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
+++ b/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import re
 from unittest import mock
 
 import pytest
@@ -232,3 +233,19 @@ class TestCassandraHook:
 
         session.shutdown()
         hook.shutdown_cluster()
+
+    def test_possible_sql_injection(self):
+        hook = CassandraHook("cassandra_default_with_schema")
+        session = hook.get_conn()
+        cqls = [
+            "DROP TABLE IF EXISTS t",
+            "CREATE TABLE t (pk1 text, pk2 text, c text, PRIMARY KEY (pk1, 
pk2))",
+            "INSERT INTO t (pk1, pk2, c) VALUES ('foo', 'bar', 'baz')",
+        ]
+        for cql in cqls:
+            session.execute(cql)
+
+        assert hook.record_exists("t", {"pk1": "foo", "pk2": "bar"})
+        assert not hook.record_exists("tt", {"pk1": "foo", "pk2": "bar"})
+        with pytest.raises(ValueError, match=re.escape("Invalid input: t; DROP 
TABLE t; SELECT * FROM t")):
+            hook.record_exists("t; DROP TABLE t; SELECT * FROM t", {"pk1": 
"foo", "pk2": "baz"})

Reply via email to