This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6b5a4aac6b Sanitize keyspace and table in the cassandra query to avoid
sql injection (#36111)
6b5a4aac6b is described below
commit 6b5a4aac6bc8fcb4a9af8f07020f5093a643efcc
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Dec 16 20:29:28 2023 +0100
Sanitize keyspace and table in the cassandra query to avoid sql injection
(#36111)
* Use query params to define cassandra keyspace and table in the query
* Follow the same pattern used for keys %(...)s
* Switch to different approach
---
airflow/providers/apache/cassandra/hooks/cassandra.py | 14 ++++++++++++--
.../providers/apache/cassandra/hooks/test_cassandra.py | 17 +++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py
b/airflow/providers/apache/cassandra/hooks/cassandra.py
index 999782e0da..66d3d13aa8 100644
--- a/airflow/providers/apache/cassandra/hooks/cassandra.py
+++ b/airflow/providers/apache/cassandra/hooks/cassandra.py
@@ -18,6 +18,7 @@
"""This module contains hook to integrate with Apache Cassandra."""
from __future__ import annotations
+import re
from typing import Any, Union
from cassandra.auth import PlainTextAuthProvider
@@ -188,6 +189,13 @@ class CassandraHook(BaseHook, LoggingMixin):
cluster_metadata = self.get_conn().cluster.metadata
return keyspace in cluster_metadata.keyspaces and table in
cluster_metadata.keyspaces[keyspace].tables
+ @staticmethod
+ def _sanitize_input(input_string: str) -> str:
+ if re.match(r"^\w+$", input_string):
+ return input_string
+ else:
+ raise ValueError(f"Invalid input: {input_string}")
+
def record_exists(self, table: str, keys: dict[str, str]) -> bool:
"""
Check if a record exists in Cassandra.
@@ -196,9 +204,11 @@ class CassandraHook(BaseHook, LoggingMixin):
Use dot notation to target a specific keyspace.
:param keys: The keys and their values to check the existence.
"""
- keyspace = self.keyspace
+ keyspace = self._sanitize_input(self.keyspace) if self.keyspace else
self.keyspace
if "." in table:
- keyspace, table = table.split(".", 1)
+ keyspace, table = map(self._sanitize_input, table.split(".", 1))
+ else:
+ table = self._sanitize_input(table)
ks_str = " AND ".join(f"{key}=%({key})s" for key in keys)
query = f"SELECT * FROM {keyspace}.{table} WHERE {ks_str}"
try:
diff --git
a/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
b/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
index c100e55bdb..c27b72d369 100644
--- a/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
+++ b/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import re
from unittest import mock
import pytest
@@ -232,3 +233,19 @@ class TestCassandraHook:
session.shutdown()
hook.shutdown_cluster()
+
+ def test_possible_sql_injection(self):
+ hook = CassandraHook("cassandra_default_with_schema")
+ session = hook.get_conn()
+ cqls = [
+ "DROP TABLE IF EXISTS t",
+ "CREATE TABLE t (pk1 text, pk2 text, c text, PRIMARY KEY (pk1,
pk2))",
+ "INSERT INTO t (pk1, pk2, c) VALUES ('foo', 'bar', 'baz')",
+ ]
+ for cql in cqls:
+ session.execute(cql)
+
+ assert hook.record_exists("t", {"pk1": "foo", "pk2": "bar"})
+ assert not hook.record_exists("tt", {"pk1": "foo", "pk2": "bar"})
+ with pytest.raises(ValueError, match=re.escape("Invalid input: t; DROP
TABLE t; SELECT * FROM t")):
+ hook.record_exists("t; DROP TABLE t; SELECT * FROM t", {"pk1":
"foo", "pk2": "baz"})