rmidhun23 opened a new issue, #52723: URL: https://github.com/apache/airflow/issues/52723
### Apache Airflow Provider(s) neo4j ### Versions of Apache Airflow Providers apache-airflow-providers-http==5.3.0 apache-airflow-providers-cncf-kubernetes==10.5.0 apache-airflow-providers-neo4j==3.9.0 ### Apache Airflow version 2.10.5 ### Operating System macOS Sequoia Version 15.4.1 ### Deployment Official Apache Airflow Helm Chart ### Deployment details ``` Docker: Version: 20.10.14, build a224086 Kubernetes: Client Version: v1.22.5 Server Version: v1.33.1 kind: Version: v0.29.0 go1.24.2 darwin/arm64 airflow (Helm): Chart: airflow-1.16.0 App Version: 2.10.5 neo4j (Helm): Chart: neo4j-2025.4.0 App Version: 2025.04.0 ``` ### What happened When using the `Neo4jOperator` to execute cypher queries, the `parameters` argument is not being passed correctly. The operator is designed to accept a `parameters` argument, but it seems that this argument is [not being utilized](https://github.com/apache/airflow/blob/main/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py#L65) when [executing the query](https://github.com/apache/airflow/blob/main/providers/neo4j/src/airflow/providers/neo4j/hooks/neo4j.py#L113). As a result of this query execution and eventually task fails with `neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter` exception. <details> <summary>Error</summary> ..... ..... ..... [2025-07-01T16:34:41.181+0000] {base.py:84} INFO - Retrieving connection 'bolt_neo4j' [2025-07-01T16:34:41.187+0000] {neo4j.py:61} INFO - URI: bolt://neo4j.neo4j.svc.cluster.local:7687 [2025-07-01T16:34:41.294+0000] {taskinstance.py:3313} ERROR - Task failed with exception neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter . Expected $`name`, but got .} {message: 42N81: Expected $`name`, but got .} {diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: CLIENT_ERROR} The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py", line 65, in execute hook.run(self.sql) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py", line 127, in run result = session.run(query) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py", line 328, in run self._auto_result._run( File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 236, in _run self._attach() File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 430, in _attach self._connection.fetch_message() File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 184, in inner func(*args, **kwargs) File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", line 864, in fetch_message res = self._process_message(tag, fields) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", line 1208, in _process_message response.on_failure(summary_metadata or {}) File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 254, in on_failure raise self._hydrate_error(metadata) neo4j.exceptions.ClientError: {code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name} [2025-07-01T16:34:41.452+0000] {taskinstance.py:1226} INFO - Marking task as UP_FOR_RETRY. dag_id=test_graph_pipeline, task_id=up_graph_db, run_id=manual__2025-07-01T15:53:36.426021+00:00, execution_date=20250701T155336, start_date=20250701T163439, end_date=20250701T163441 [2025-07-01T16:34:41.786+0000] {taskinstance.py:341} INFO - ::group::Post task execution logs [2025-07-01T16:34:41.788+0000] {standard_task_runner.py:124} ERROR - Failed to execute job 385 for task up_graph_db ({code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name}; 41) neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter . Expected $`name`, but got .} {message: 42N81: Expected $`name`, but got .} {diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: CLIENT_ERROR} The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork ret = args.func(args, dag=self.dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method return _run_raw_task(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task return ti._run_raw_task( ^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3006, in _run_raw_task return _run_raw_task( ^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 274, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3161, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3185, in _execute_task return _execute_task(self, context, task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py", line 65, in execute hook.run(self.sql) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py", line 127, in run result = session.run(query) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py", line 328, in run self._auto_result._run( File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 236, in _run self._attach() File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 430, in _attach self._connection.fetch_message() File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 184, in inner func(*args, **kwargs) File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", line 864, in fetch_message res = self._process_message(tag, fields) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", line 1208, in _process_message response.on_failure(summary_metadata or {}) File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 254, in on_failure raise self._hydrate_error(metadata) neo4j.exceptions.ClientError: {code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name} [2025-07-01T16:34:41.826+0000] {local_task_job_runner.py:266} INFO - Task exited with return code 1 [2025-07-01T16:34:41.986+0000] {local_task_job_runner.py:245} INFO - ::endgroup:: </details>  ### What you think should happen instead The `Neo4jOperator` should correctly pass the `parameters` argument to the cypher query execution. This would allow for dynamic queries that can accept parameters at runtime, ans should support either plain arguments or `XComArg` from upstream tasks. ### How to reproduce 1. Deploy `airflow` and `neo4j` using Helm charts. 2. Run a DAG with `Neo4jOperator` with `parameters` argument and a query that uses it. 3. `Neo4jOperator` task fails with error code [GQLSTATUS:42N81](https://neo4j.com/docs/status-codes/current/errors/gql-errors/#_42n81) due to missing parameters. ```python import logging import json from datetime import datetime # pylint: disable=import-error from airflow.decorators import dag from airflow.models import Connection, Variable from airflow.utils.session import provide_session from airflow.operators.empty import EmptyOperator from airflow.providers.neo4j.operators.neo4j import Neo4jOperator logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @provide_session def create_or_update_connection( conn_id, conn_type, host, schema=None, port=None, extra=None, session=None ): """ Create or update a connection in Airflow. """ conn = session.query(Connection).filter(Connection.conn_id == conn_id).first() if conn is None: conn = Connection( conn_id=conn_id, conn_type=conn_type, description=None, host=host, login=None, password=None, schema=schema, port=port, extra=extra, ) session.add(conn) session.commit() logger.info("create_or_update_connection: Connection %s created", conn_id) else: logger.info( "create_or_update_connection: Connection %s already exsits:", conn_id ) conn.conn_type = conn_type conn.host = host conn.schema = schema conn.port = port conn.extra = extra session.commit() session.close() def merge_neo4j_bolt_connection(var): """ Create or update the Neo4j bolt connection in Airflow. """ ## Assumes neo4j standalone deployment ## Using bolt unsecured on 7687 conn_id = var.get("conn_id", "bolt_neo4j") conn_type = var.get("conn_type", "neo4j") port = int(var.get("port", 7687)) host = var.get("host", "neo4j.neo4j.svc.cluster.local") schema = var.get("schema", "neo4j") extra_options = var.get("extra", json.loads("""{ "encrypted": false }""")) create_or_update_connection( conn_id=conn_id, conn_type=conn_type, host=host, schema=schema, port=port, extra=json.dumps(extra_options), ) # Merge the connection neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", deserialize_json=True) merge_neo4j_bolt_connection(neo4j_bolt_vars) # Define the DAG @dag( dag_id="test_graph_pipeline", description="Neo4j provider sample", default_args={ "owner": "airflow", "start_date": datetime(2025, 5, 25), "retries": 3, }, schedule_interval=None, start_date=datetime(2025, 6, 23), catchup=False, tags=["airflow, neo4j provider"], ) def test_graph_pipeline(): """ DAG to test parameters binding using Neo4jProvider in Airflow. """ start = EmptyOperator(task_id="start") update_graph_db = Neo4jOperator( task_id="up_graph_db", parameters={"name": "Airflow"}, neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"), sql=""" USE neo4j MERGE (a:Person {name: $name}) ON CREATE SET a.created = timestamp() ON MATCH SET a.updated = timestamp() """, ) end = EmptyOperator(task_id="end") start >> update_graph_db >> end DAG_INSTANCE = test_graph_pipeline() ``` ### Anything else 1. Deploy `airflow` and `neo4j` using Helm charts. 2. Run a DAG with `Neo4jOperator` with `parameters` argument and a query that uses it. 3. `Neo4jOperator` task fails with error code [GQLSTATUS:42N81](https://neo4j.com/docs/status-codes/current/errors/gql-errors/#_42n81) due to missing parameters. ```python import logging import json from datetime import datetime # pylint: disable=import-error from airflow.decorators import dag from airflow.models import Connection, Variable from airflow.utils.session import provide_session from airflow.operators.empty import EmptyOperator from airflow.providers.neo4j.operators.neo4j import Neo4jOperator logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @provide_session def create_or_update_connection( conn_id, conn_type, host, schema=None, port=None, extra=None, session=None ): """ Create or update a connection in Airflow. """ conn = session.query(Connection).filter(Connection.conn_id == conn_id).first() if conn is None: conn = Connection( conn_id=conn_id, conn_type=conn_type, description=None, host=host, login=None, password=None, schema=schema, port=port, extra=extra, ) session.add(conn) session.commit() logger.info("create_or_update_connection: Connection %s created", conn_id) else: logger.info( "create_or_update_connection: Connection %s already exsits:", conn_id ) conn.conn_type = conn_type conn.host = host conn.schema = schema conn.port = port conn.extra = extra session.commit() session.close() def merge_neo4j_bolt_connection(var): """ Create or update the Neo4j bolt connection in Airflow. """ ## Assumes neo4j standalone deployment ## Using bolt unsecured on 7687 conn_id = var.get("conn_id", "bolt_neo4j") conn_type = var.get("conn_type", "neo4j") port = int(var.get("port", 7687)) host = var.get("host", "neo4j.neo4j.svc.cluster.local") schema = var.get("schema", "neo4j") extra_options = var.get("extra", json.loads("""{ "encrypted": false }""")) create_or_update_connection( conn_id=conn_id, conn_type=conn_type, host=host, schema=schema, port=port, extra=json.dumps(extra_options), ) # Merge the connection neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", deserialize_json=True) merge_neo4j_bolt_connection(neo4j_bolt_vars) # Define the DAG @dag( dag_id="test_graph_pipeline", description="Neo4j provider sample", default_args={ "owner": "airflow", "start_date": datetime(2025, 5, 25), "retries": 3, }, schedule_interval=None, start_date=datetime(2025, 6, 23), catchup=False, tags=["airflow, neo4j provider"], ) def test_graph_pipeline(): """ DAG to test parameters binding using Neo4jProvider in Airflow. """ start = EmptyOperator(task_id="start") update_graph_db = Neo4jOperator( task_id="up_graph_db", parameters={"name": "Airflow"}, neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"), sql=""" USE neo4j MERGE (a:Person {name: $name}) ON CREATE SET a.created = timestamp() ON MATCH SET a.updated = timestamp() """, ) end = EmptyOperator(task_id="end") start >> update_graph_db >> end DAG_INSTANCE = test_graph_pipeline() ``` ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
