dipesh747 opened a new issue, #27814:
URL: https://github.com/apache/airflow/issues/27814
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
When calling Get an XCom entry endpoint, I get a 404 Xcom entry not found
error. All other endpoints are working correctly and the URL is correct. When
I manually go into the airflow webserver and find the relevant dag_run and go
into xcom, I am able to see the xcom keys and values.
Airflow version 2.1.2.
I have tested this in airflow 2.4.2 and this error does not occur, so seems
to be specific to the earlier version.
### What you think should happen instead
Get an XCom entry should work as according to the documentation
### How to reproduce
The airflow task is an instance of: DatabricksSubmitRunOperator
In dag file:
from airflow.providers.databricks.operators.databricks import
DatabricksSubmitRunOperator
def create_databricks_task(task_id, froms, imports, function='', kwargs='',
trigger_rule='all_success'):
"""
Args:
task_id (str): Globally unique name of the task
froms (str): Python package name
imports (str): Module to import
function (str): name of function to run. For scipts this is usually
main.
kwargs (str): Keyword arguments for module
trigger_rule (str): State required for dag to trigger.
Returns:
An instance of the class DatabricksSubmitRunOperator
"""
params = {
'existing_cluster_id': cluster,
'notebook_task': {
'notebook_path': databricks_task_notebook_path,
'base_parameters': {
'from': froms,
'import': imports,
'function': function,
'kwargs': kwargs
}
},
}
return DatabricksSubmitRunOperator(task_id=task_id, json=params,
trigger_rule=trigger_rule,
on_failure_callback=_teams_webhook_post, do_xcom_push=True,
run_name=f"{task_id}--{froms}--{imports}")
# Generate universe
task_id = 'generate_universe'
imports = 'generate_universe'
kwargs = ""
froms = 'scripts.drsk.drsk_universe_creation'
generate_universe = create_databricks_task(task_id=task_id, froms=froms,
imports=imports, kwargs=kwargs)
In databricks it calls this notebook:
import logging
logging.getLogger("py4j").setLevel(logging.WARN)
dbutils.widgets.text("import", '')
dbutils.widgets.text("from", '')
dbutils.widgets.text("kwargs", '')
dbutils.widgets.text("function", '')
imports = dbutils.widgets.get("import")
froms = dbutils.widgets.get("from")
kwargs_str = dbutils.widgets.get("kwargs")
function = dbutils.widgets.get("function")
kwargs={}
_temp = __import__(f'{froms}', globals(), locals(),[f'{imports}'] )
if not function :
result = getattr(_temp, f'{imports}').main(**kwargs)
else:
result = getattr(getattr(_temp, f'{imports}'),f'{function}')(**kwargs)
This should be configured to a databricks environment as per the
documentation.
The airflow API is being accessed in the following way:
import requests
import pandas as pd
auth = (<username>, <password>)
headers = {
"Content-Type": "application/json"
}
url = <airflow_url>
deployment_url = f"http://{url }/api/v1"
def airflow_request(url):
return requests.get(
url=f"{deployment_url}/{url}",
headers=headers,
data='{}',
auth=auth
)
# Get dag run ID
response = airflow_request(f"dags/{dag_id}/dagRuns?offset=100")
json = response.json()
dag_df = pd.DataFrame.from_dict(json['dag_runs'])
# Xcom example
dag_id = 'drsk'
dag_run_id = 'manual__2022-11-21T06:22:17.169771+00:00' # selected from
above endpoiont
task_id = 'generate_universe'
xcom_key = 'run_page_url'
response =
airflow_request(f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries)
# This endpoint works #correctly and displays all relevant xcom_keys
response =
airflow_request(f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}")
json = response.json()
xcom = pd.DataFrame.from_dict(json['xcom_entries'])
### Operating System
windows 10 for api call, ubuntu 20.04 for airflow
### Versions of Apache Airflow Providers
apache-airflow[crypto,postgres]==2.1.2
apache-airflow-providers-databricks==2.0.0
apache-airflow-providers-microsoft-azure==3.1.0
azure-servicebus
### Deployment
Docker-Compose
### Deployment details
Docker version 20.10.7, build 20.10.7-0ubuntu5~18.04.3
Basic docker-compose deployment
### Anything else

### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]