rmidhun23 opened a new issue, #52723:
URL: https://github.com/apache/airflow/issues/52723

   ### Apache Airflow Provider(s)
   
   neo4j
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-http==5.3.0
   apache-airflow-providers-cncf-kubernetes==10.5.0
   apache-airflow-providers-neo4j==3.9.0
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   macOS Sequoia Version 15.4.1
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   ```
   Docker: 
   Version: 20.10.14, build a224086
   
   Kubernetes:
   Client Version: v1.22.5
   Server Version: v1.33.1
   
   kind:
   Version: v0.29.0 go1.24.2 darwin/arm64
   
   airflow (Helm):
   Chart:  airflow-1.16.0  App Version: 2.10.5
   
   neo4j (Helm):
   Chart: neo4j-2025.4.0  App Version: 2025.04.0 
   ```
   
   ### What happened
   
   When using the `Neo4jOperator` to execute cypher queries, the `parameters` 
argument is not being passed correctly. The operator is designed to accept a 
`parameters` argument,
   but it seems that this argument is [not being 
utilized](https://github.com/apache/airflow/blob/main/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py#L65)
 when [executing the 
query](https://github.com/apache/airflow/blob/main/providers/neo4j/src/airflow/providers/neo4j/hooks/neo4j.py#L113).
 As a result of this query execution and eventually task fails with 
`neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: 
syntax error or access rule violation - missing request parameter`  exception.
   
   <details>
     <summary>Error</summary>
   .....
   .....
   .....
   
   [2025-07-01T16:34:41.181+0000] {base.py:84} INFO - Retrieving connection 
'bolt_neo4j'
   [2025-07-01T16:34:41.187+0000] {neo4j.py:61} INFO - URI: 
bolt://neo4j.neo4j.svc.cluster.local:7687
   [2025-07-01T16:34:41.294+0000] {taskinstance.py:3313} ERROR - Task failed 
with exception
   neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: 
error: syntax error or access rule violation - missing request parameter . 
Expected $`name`, but got .} {message: 42N81: Expected $`name`, but got .} 
{diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 
'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: 
CLIENT_ERROR}
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 768, in _execute_task
   result = _execute_callable(context=context, **execute_callable_kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 734, in _execute_callable
   return ExecutionCallableRunner(
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
   return self.func(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 424, in wrapper
   return func(self, *args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py",
 line 65, in execute
   hook.run(self.sql)
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py",
 line 127, in run
   result = session.run(query)
   ^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py",
 line 328, in run
   self._auto_result._run(
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", 
line 236, in _run
   self._attach()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", 
line 430, in _attach
   self._connection.fetch_message()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", 
line 184, in inner
   func(*args, **kwargs)
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", 
line 864, in fetch_message
   res = self._process_message(tag, fields)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", 
line 1208, in _process_message
   response.on_failure(summary_metadata or {})
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", 
line 254, in on_failure
   raise self._hydrate_error(metadata)
   neo4j.exceptions.ClientError: {code: 
Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): 
name}
   [2025-07-01T16:34:41.452+0000] {taskinstance.py:1226} INFO - Marking task as 
UP_FOR_RETRY. dag_id=test_graph_pipeline, task_id=up_graph_db, 
run_id=manual__2025-07-01T15:53:36.426021+00:00, 
execution_date=20250701T155336, start_date=20250701T163439, 
end_date=20250701T163441
   [2025-07-01T16:34:41.786+0000] {taskinstance.py:341} INFO - ::group::Post 
task execution logs
   [2025-07-01T16:34:41.788+0000] {standard_task_runner.py:124} ERROR - Failed 
to execute job 385 for task up_graph_db ({code: 
Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): 
name}; 41)
   neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: 
error: syntax error or access rule violation - missing request parameter . 
Expected $`name`, but got .} {message: 42N81: Expected $`name`, but got .} 
{diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 
'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: 
CLIENT_ERROR}
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 117, in _start_by_fork
   ret = args.func(args, dag=self.dag)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
   return func(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
   return f(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
 line 483, in task_run
   task_return_code = _run_task_by_selected_method(args, _dag, ti)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
 line 256, in _run_task_by_selected_method
   return _run_raw_task(args, ti)
   ^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
 line 341, in _run_raw_task
   return ti._run_raw_task(
   ^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
   return func(*args, session=session, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 3006, in _run_raw_task
   return _run_raw_task(
   ^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 274, in _run_raw_task
   TaskInstance._execute_task_with_callbacks(
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 3161, in _execute_task_with_callbacks
   result = self._execute_task(context, task_orig)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 3185, in _execute_task
   return _execute_task(self, context, task_orig)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 768, in _execute_task
   result = _execute_callable(context=context, **execute_callable_kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 734, in _execute_callable
   return ExecutionCallableRunner(
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
   return self.func(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 424, in wrapper
   return func(self, *args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py",
 line 65, in execute
   hook.run(self.sql)
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py",
 line 127, in run
   result = session.run(query)
   ^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py",
 line 328, in run
   self._auto_result._run(
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", 
line 236, in _run
   self._attach()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", 
line 430, in _attach
   self._connection.fetch_message()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", 
line 184, in inner
   func(*args, **kwargs)
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", 
line 864, in fetch_message
   res = self._process_message(tag, fields)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", 
line 1208, in _process_message
   response.on_failure(summary_metadata or {})
   File 
"/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", 
line 254, in on_failure
   raise self._hydrate_error(metadata)
   neo4j.exceptions.ClientError: {code: 
Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): 
name}
   [2025-07-01T16:34:41.826+0000] {local_task_job_runner.py:266} INFO - Task 
exited with return code 1
   [2025-07-01T16:34:41.986+0000] {local_task_job_runner.py:245} INFO - 
::endgroup::
   </details>
   
   
![Image](https://github.com/user-attachments/assets/bb8f3ff4-c1f2-4597-b9f2-61bbe59e1d1f)
   
   ### What you think should happen instead
   
   The `Neo4jOperator` should correctly pass the `parameters` argument to the 
cypher query execution. This would allow for dynamic queries that can accept 
parameters at runtime, ans should support either plain arguments or `XComArg` 
from upstream tasks.
   
   ### How to reproduce
   
   1. Deploy `airflow` and `neo4j` using Helm charts.
   2. Run a DAG with `Neo4jOperator` with `parameters` argument and a query 
that uses it.
   3. `Neo4jOperator` task fails with error code 
[GQLSTATUS:42N81](https://neo4j.com/docs/status-codes/current/errors/gql-errors/#_42n81)
 due to missing parameters.
   
   ```python
   import logging
   import json
   from datetime import datetime
   
   # pylint: disable=import-error
   from airflow.decorators import dag
   from airflow.models import Connection, Variable
   from airflow.utils.session import provide_session
   from airflow.operators.empty import EmptyOperator
   from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
   
   logging.basicConfig(level=logging.INFO)
   logger = logging.getLogger(__name__)
   
   @provide_session
   def create_or_update_connection(
       conn_id, conn_type, host, schema=None, port=None, extra=None, 
session=None
   ):
       """
       Create or update a connection in Airflow.
       """
   
       conn = session.query(Connection).filter(Connection.conn_id == 
conn_id).first()
   
       if conn is None:
           conn = Connection(
               conn_id=conn_id,
               conn_type=conn_type,
               description=None,
               host=host,
               login=None,
               password=None,
               schema=schema,
               port=port,
               extra=extra,
           )
           session.add(conn)
           session.commit()
           logger.info("create_or_update_connection: Connection %s created", 
conn_id)
       else:
           logger.info(
               "create_or_update_connection: Connection %s already exsits:", 
conn_id
           )
           conn.conn_type = conn_type
           conn.host = host
           conn.schema = schema
           conn.port = port
           conn.extra = extra
           session.commit()
       session.close()
   
   
   def merge_neo4j_bolt_connection(var):
       """
       Create or update the Neo4j bolt connection in Airflow.
       """
   
       ## Assumes neo4j standalone deployment
       ## Using bolt unsecured on 7687
       conn_id = var.get("conn_id", "bolt_neo4j")
       conn_type = var.get("conn_type", "neo4j")
       port = int(var.get("port", 7687))
       host = var.get("host", "neo4j.neo4j.svc.cluster.local")
       schema = var.get("schema", "neo4j")
       extra_options = var.get("extra", json.loads("""{ "encrypted": false 
}"""))
   
       create_or_update_connection(
           conn_id=conn_id,
           conn_type=conn_type,
           host=host,
           schema=schema,
           port=port,
           extra=json.dumps(extra_options),
       )
   
   
   # Merge the connection
   neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", 
deserialize_json=True)
   merge_neo4j_bolt_connection(neo4j_bolt_vars)
   
   
   # Define the DAG
   @dag(
       dag_id="test_graph_pipeline",
       description="Neo4j provider sample",
       default_args={
           "owner": "airflow",
           "start_date": datetime(2025, 5, 25),
           "retries": 3,
       },
       schedule_interval=None,
       start_date=datetime(2025, 6, 23),
       catchup=False,
       tags=["airflow, neo4j provider"],
   )
   def test_graph_pipeline():
       """
       DAG to test parameters binding using Neo4jProvider in Airflow.
       """
   
       start = EmptyOperator(task_id="start")
   
       update_graph_db = Neo4jOperator(
           task_id="up_graph_db",
           parameters={"name": "Airflow"},
           neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"),
           sql="""
             USE neo4j
             MERGE (a:Person {name: $name})
             ON CREATE SET a.created = timestamp()
             ON MATCH SET a.updated = timestamp()
           """,
       )
   
       end = EmptyOperator(task_id="end")
   
       start >> update_graph_db >> end
   
   
   DAG_INSTANCE = test_graph_pipeline()
   ```
   
   ### Anything else
   
   1. Deploy `airflow` and `neo4j` using Helm charts.
   2. Run a DAG with `Neo4jOperator` with `parameters` argument and a query 
that uses it.
   3. `Neo4jOperator` task fails with error code 
[GQLSTATUS:42N81](https://neo4j.com/docs/status-codes/current/errors/gql-errors/#_42n81)
 due to missing parameters.
   
   ```python
   import logging
   import json
   from datetime import datetime
   
   # pylint: disable=import-error
   from airflow.decorators import dag
   from airflow.models import Connection, Variable
   from airflow.utils.session import provide_session
   from airflow.operators.empty import EmptyOperator
   from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
   
   logging.basicConfig(level=logging.INFO)
   logger = logging.getLogger(__name__)
   
   @provide_session
   def create_or_update_connection(
       conn_id, conn_type, host, schema=None, port=None, extra=None, 
session=None
   ):
       """
       Create or update a connection in Airflow.
       """
   
       conn = session.query(Connection).filter(Connection.conn_id == 
conn_id).first()
   
       if conn is None:
           conn = Connection(
               conn_id=conn_id,
               conn_type=conn_type,
               description=None,
               host=host,
               login=None,
               password=None,
               schema=schema,
               port=port,
               extra=extra,
           )
           session.add(conn)
           session.commit()
           logger.info("create_or_update_connection: Connection %s created", 
conn_id)
       else:
           logger.info(
               "create_or_update_connection: Connection %s already exsits:", 
conn_id
           )
           conn.conn_type = conn_type
           conn.host = host
           conn.schema = schema
           conn.port = port
           conn.extra = extra
           session.commit()
       session.close()
   
   
   def merge_neo4j_bolt_connection(var):
       """
       Create or update the Neo4j bolt connection in Airflow.
       """
   
       ## Assumes neo4j standalone deployment
       ## Using bolt unsecured on 7687
       conn_id = var.get("conn_id", "bolt_neo4j")
       conn_type = var.get("conn_type", "neo4j")
       port = int(var.get("port", 7687))
       host = var.get("host", "neo4j.neo4j.svc.cluster.local")
       schema = var.get("schema", "neo4j")
       extra_options = var.get("extra", json.loads("""{ "encrypted": false 
}"""))
   
       create_or_update_connection(
           conn_id=conn_id,
           conn_type=conn_type,
           host=host,
           schema=schema,
           port=port,
           extra=json.dumps(extra_options),
       )
   
   
   # Merge the connection
   neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", 
deserialize_json=True)
   merge_neo4j_bolt_connection(neo4j_bolt_vars)
   
   
   # Define the DAG
   @dag(
       dag_id="test_graph_pipeline",
       description="Neo4j provider sample",
       default_args={
           "owner": "airflow",
           "start_date": datetime(2025, 5, 25),
           "retries": 3,
       },
       schedule_interval=None,
       start_date=datetime(2025, 6, 23),
       catchup=False,
       tags=["airflow, neo4j provider"],
   )
   def test_graph_pipeline():
       """
       DAG to test parameters binding using Neo4jProvider in Airflow.
       """
   
       start = EmptyOperator(task_id="start")
   
       update_graph_db = Neo4jOperator(
           task_id="up_graph_db",
           parameters={"name": "Airflow"},
           neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"),
           sql="""
             USE neo4j
             MERGE (a:Person {name: $name})
             ON CREATE SET a.created = timestamp()
             ON MATCH SET a.updated = timestamp()
           """,
       )
   
       end = EmptyOperator(task_id="end")
   
       start >> update_graph_db >> end
   
   
   DAG_INSTANCE = test_graph_pipeline()
   ```
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to