dipesh747 opened a new issue, #27814:
URL: https://github.com/apache/airflow/issues/27814

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   When calling Get an XCom entry endpoint, I get a 404 Xcom entry not found 
error. All other endpoints are working correctly and the URL is correct.  When 
I manually go into the airflow webserver and find the relevant dag_run and go 
into xcom, I am able to see the xcom keys and values. 
   
   Airflow version 2.1.2. 
   
   I have tested this in airflow 2.4.2 and this error does not occur, so seems 
to be specific to the earlier version. 
   
   ### What you think should happen instead
   
   Get an XCom entry should work as according to the documentation 
   
   ### How to reproduce
   
   The airflow task is an instance of: DatabricksSubmitRunOperator 
   In dag file:
   
   from airflow.providers.databricks.operators.databricks import 
DatabricksSubmitRunOperator
   
   def create_databricks_task(task_id, froms, imports, function='', kwargs='', 
trigger_rule='all_success'):
       """
   
       Args:
           task_id (str): Globally unique name of the task
           froms (str): Python package name
           imports (str): Module to import
           function (str): name of function to run. For scipts this is usually 
main.
           kwargs (str): Keyword arguments for module
           trigger_rule (str): State required for dag to trigger. 
   
       Returns:
           An instance of the class DatabricksSubmitRunOperator
       """
       params = {
           'existing_cluster_id': cluster,
           'notebook_task': {
               'notebook_path': databricks_task_notebook_path,
               'base_parameters': {
                   'from': froms,
                   'import': imports,
                   'function': function,
                   'kwargs': kwargs
               }
           },
       }
       return DatabricksSubmitRunOperator(task_id=task_id, json=params, 
trigger_rule=trigger_rule,
                                          
on_failure_callback=_teams_webhook_post, do_xcom_push=True,
                                          
run_name=f"{task_id}--{froms}--{imports}")
   
   
   # Generate universe
   task_id = 'generate_universe'
   imports = 'generate_universe'
   kwargs = ""
   froms = 'scripts.drsk.drsk_universe_creation'
   generate_universe = create_databricks_task(task_id=task_id, froms=froms, 
imports=imports, kwargs=kwargs)
   
   In databricks it calls this notebook:
   import logging
   logging.getLogger("py4j").setLevel(logging.WARN)
   dbutils.widgets.text("import", '')
   dbutils.widgets.text("from", '')
   dbutils.widgets.text("kwargs", '')
   dbutils.widgets.text("function", '')
   imports = dbutils.widgets.get("import")
   froms = dbutils.widgets.get("from")
   kwargs_str = dbutils.widgets.get("kwargs")
   function = dbutils.widgets.get("function")
   kwargs={}
   _temp = __import__(f'{froms}', globals(), locals(),[f'{imports}'] )
   if not function : 
     result = getattr(_temp, f'{imports}').main(**kwargs)
   else: 
     result = getattr(getattr(_temp, f'{imports}'),f'{function}')(**kwargs)
   
   This should be configured to a databricks environment as per the 
documentation. 
   
   The airflow API is being accessed in the following way: 
   import requests
   import pandas as pd
   
   auth = (<username>, <password>)
   headers = {
       "Content-Type": "application/json"
   }
   url = <airflow_url>
   deployment_url = f"http://{url }/api/v1"
   
   
   def airflow_request(url):
       return requests.get(
           url=f"{deployment_url}/{url}",
           headers=headers,
           data='{}',
           auth=auth
       )
   
   
   # Get dag run ID
   response = airflow_request(f"dags/{dag_id}/dagRuns?offset=100")
   json = response.json()
   dag_df = pd.DataFrame.from_dict(json['dag_runs'])
   
   # Xcom example
   dag_id = 'drsk'
   dag_run_id = 'manual__2022-11-21T06:22:17.169771+00:00' # selected from 
above endpoiont
   task_id = 'generate_universe'
   xcom_key = 'run_page_url'
   response = 
airflow_request(f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries)
 # This endpoint works #correctly and displays all relevant xcom_keys 
   response = 
airflow_request(f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}")
   json = response.json()
   xcom = pd.DataFrame.from_dict(json['xcom_entries'])
   
   
   ### Operating System
   
   windows 10 for api call, ubuntu 20.04 for airflow 
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow[crypto,postgres]==2.1.2
   apache-airflow-providers-databricks==2.0.0
   apache-airflow-providers-microsoft-azure==3.1.0
   azure-servicebus
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker version 20.10.7, build 20.10.7-0ubuntu5~18.04.3
   Basic docker-compose deployment 
   
   ### Anything else
   
   
![image](https://user-images.githubusercontent.com/51231802/202996254-1e8d823b-0d14-4a98-83c0-6c8b2794a24a.png)
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to