This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8dceaada15 Renamed up/downstream references to producing/consuming
(#25688)
8dceaada15 is described below
commit 8dceaada15f7a368ac97809822df876689e42954
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Aug 12 15:54:48 2022 +0100
Renamed up/downstream references to producing/consuming (#25688)
We changed the UI to use these new terms already, but the code wasn't
changed. These names are clearer to me (and others) so I have made the
code reflect them too.
---
airflow/api_connexion/endpoints/dataset_endpoint.py | 6 ++----
airflow/api_connexion/openapi/v1.yaml | 4 ++--
airflow/api_connexion/schemas/dataset_schema.py | 4 ++--
airflow/models/dataset.py | 4 ++--
airflow/models/taskinstance.py | 6 +++---
airflow/www/static/js/datasets/Details.tsx | 12 ++++++------
airflow/www/static/js/datasets/List.tsx | 4 ++--
airflow/www/static/js/types/api-generated.ts | 4 ++--
tests/api_connexion/endpoints/test_dataset_endpoint.py | 12 ++++++------
tests/api_connexion/schemas/test_dataset_schema.py | 12 ++++++------
tests/models/test_dag.py | 4 ++--
11 files changed, 35 insertions(+), 37 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index b6d4ac0ca5..dbfafec6e3 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -42,7 +42,7 @@ def get_dataset(id: int, session: Session = NEW_SESSION) ->
APIResponse:
"""Get a Dataset"""
dataset = (
session.query(Dataset)
- .options(joinedload(Dataset.downstream_dag_references),
joinedload(Dataset.upstream_task_references))
+ .options(joinedload(Dataset.consuming_dags),
joinedload(Dataset.producing_tasks))
.get(id)
)
if not dataset:
@@ -73,9 +73,7 @@ def get_datasets(
query = query.filter(Dataset.uri.ilike(f"%{uri_pattern}%"))
query = apply_sorting(query, order_by, {}, allowed_attrs)
datasets = (
- query.options(
- subqueryload(Dataset.downstream_dag_references),
subqueryload(Dataset.upstream_task_references)
- )
+ query.options(subqueryload(Dataset.consuming_dags),
subqueryload(Dataset.producing_tasks))
.offset(offset)
.limit(limit)
.all()
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 74baeaa122..1056d63a74 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3519,11 +3519,11 @@ components:
type: string
description: The dataset update time
nullable: false
- downstream_dag_references:
+ consuming_dags:
type: array
items:
$ref: '#/components/schemas/DatasetDagRef'
- upstream_task_references:
+ producing_tasks:
type: array
items:
$ref: '#/components/schemas/DatasetTaskRef'
diff --git a/airflow/api_connexion/schemas/dataset_schema.py
b/airflow/api_connexion/schemas/dataset_schema.py
index 7ff3e638e1..ffb8b88e4b 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -64,8 +64,8 @@ class DatasetSchema(SQLAlchemySchema):
extra = JsonObjectField()
created_at = auto_field()
updated_at = auto_field()
- upstream_task_references = fields.List(fields.Nested(DatasetTaskRefSchema))
- downstream_dag_references = fields.List(fields.Nested(DatasetDagRefSchema))
+ producing_tasks = fields.List(fields.Nested(DatasetTaskRefSchema))
+ consuming_dags = fields.List(fields.Nested(DatasetDagRefSchema))
class DatasetCollection(NamedTuple):
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index c1dfd866c5..bec94b5064 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -52,8 +52,8 @@ class Dataset(Base):
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow, nullable=False)
- downstream_dag_references = relationship("DatasetDagRef",
back_populates="dataset")
- upstream_task_references = relationship("DatasetTaskRef",
back_populates="dataset")
+ consuming_dags = relationship("DatasetDagRef", back_populates="dataset")
+ producing_tasks = relationship("DatasetTaskRef", back_populates="dataset")
__tablename__ = "dataset"
__table_args__ = (
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 502c863876..7c9cac7f24 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1538,8 +1538,8 @@ class TaskInstance(Base, LoggingMixin):
if not dataset:
self.log.warning("Dataset %s not found", obj)
continue
- downstream_dag_ids = [x.dag_id for x in
dataset.downstream_dag_references]
- self.log.debug("downstream dag ids %s", downstream_dag_ids)
+ consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
+ self.log.debug("consuming dag ids %s", consuming_dag_ids)
session.add(
DatasetEvent(
dataset_id=dataset.id,
@@ -1549,7 +1549,7 @@ class TaskInstance(Base, LoggingMixin):
source_map_index=self.map_index,
)
)
- for dag_id in downstream_dag_ids:
+ for dag_id in consuming_dag_ids:
session.merge(DatasetDagRunQueue(dataset_id=dataset.id,
target_dag_id=dag_id))
def _execute_task_with_callbacks(self, context, test_mode=False):
diff --git a/airflow/www/static/js/datasets/Details.tsx
b/airflow/www/static/js/datasets/Details.tsx
index c412c2b12e..d871fe317f 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -43,8 +43,8 @@ const gridUrl = getMetaValue('grid_url');
const Details = ({
dataset: {
uri,
- upstreamTaskReferences,
- downstreamDagReferences,
+ producingTasks,
+ consumingDags,
},
}: { dataset: API.Dataset }) => (
<Box>
@@ -54,13 +54,13 @@ const Details = ({
{uri}
<ClipboardButton value={uri} iconOnly ml={2} />
</Heading>
- {upstreamTaskReferences && !!upstreamTaskReferences.length && (
+ {producingTasks && !!producingTasks.length && (
<Box mb={2}>
<Flex alignItems="center">
<Heading size="md" fontWeight="normal">Producing Tasks</Heading>
<InfoTooltip label="Tasks that will update this dataset." size={14} />
</Flex>
- {upstreamTaskReferences.map(({ dagId, taskId }) => (
+ {producingTasks.map(({ dagId, taskId }) => (
<Link
key={`${dagId}.${taskId}`}
color="blue.600"
@@ -72,13 +72,13 @@ const Details = ({
))}
</Box>
)}
- {downstreamDagReferences && !!downstreamDagReferences.length && (
+ {consumingDags && !!consumingDags.length && (
<Box>
<Flex alignItems="center">
<Heading size="md" fontWeight="normal">Consuming DAGs</Heading>
<InfoTooltip label="DAGs that depend on this dataset updating to
trigger a run." size={14} />
</Flex>
- {downstreamDagReferences.map(({ dagId }) => (
+ {consumingDags.map(({ dagId }) => (
<Link
key={dagId}
color="blue.600"
diff --git a/airflow/www/static/js/datasets/List.tsx
b/airflow/www/static/js/datasets/List.tsx
index 79fe4e5442..4765baa02d 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -72,13 +72,13 @@ const DatasetsList = ({ onSelect }: Props) => {
},
{
Header: UpstreamHeader,
- accessor: 'upstreamTaskReferences',
+ accessor: 'producingTasks',
Cell: ({ cell: { value } }: any) => value.length,
disableSortBy: true,
},
{
Header: DownstreamHeader,
- accessor: 'downstreamDagReferences',
+ accessor: 'consumingDags',
Cell: ({ cell: { value } }: any) => value.length,
disableSortBy: true,
},
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index 0b016f430f..8df89497f7 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1477,8 +1477,8 @@ export interface components {
created_at?: string;
/** @description The dataset update time */
updated_at?: string;
- downstream_dag_references?: components["schemas"]["DatasetDagRef"][];
- upstream_task_references?: components["schemas"]["DatasetTaskRef"][];
+ consuming_dags?: components["schemas"]["DatasetDagRef"][];
+ producing_tasks?: components["schemas"]["DatasetTaskRef"][];
};
/**
* @description A datasets reference to an upstream task.
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 6e71592785..3343b8d208 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -89,8 +89,8 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
- "downstream_dag_references": [],
- "upstream_task_references": [],
+ "consuming_dags": [],
+ "producing_tasks": [],
}
def test_should_respond_404(self):
@@ -138,8 +138,8 @@ class TestGetDatasets(TestDatasetEndpoint):
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
- "downstream_dag_references": [],
- "upstream_task_references": [],
+ "consuming_dags": [],
+ "producing_tasks": [],
},
{
"id": 2,
@@ -147,8 +147,8 @@ class TestGetDatasets(TestDatasetEndpoint):
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
- "downstream_dag_references": [],
- "upstream_task_references": [],
+ "consuming_dags": [],
+ "producing_tasks": [],
},
],
"total_entries": 2,
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py
b/tests/api_connexion/schemas/test_dataset_schema.py
index 9703e2f0c3..6f36718300 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -67,14 +67,14 @@ class TestDatasetSchema(TestDatasetSchemaBase):
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
- "downstream_dag_references": [
+ "consuming_dags": [
{
"dag_id": "test_dataset_downstream_schema",
"created_at": self.timestamp,
"updated_at": self.timestamp,
}
],
- "upstream_task_references": [
+ "producing_tasks": [
{
"task_id": "task1",
"dag_id": "test_dataset_upstream_schema",
@@ -110,8 +110,8 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
- "downstream_dag_references": [],
- "upstream_task_references": [],
+ "consuming_dags": [],
+ "producing_tasks": [],
},
{
"id": 2,
@@ -119,8 +119,8 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
- "downstream_dag_references": [],
- "upstream_task_references": [],
+ "consuming_dags": [],
+ "producing_tasks": [],
},
],
"total_entries": 2,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 083b680f17..b9d812728c 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -841,8 +841,8 @@ class TestDag:
d2 = stored_datasets[d2.uri]
d3 = stored_datasets[d3.uri]
assert stored_datasets[uri1].extra == {"should": "be used"}
- assert [x.dag_id for x in d1.downstream_dag_references] == [dag_id1]
- assert [(x.task_id, x.dag_id) for x in d1.upstream_task_references] ==
[(task_id, dag_id2)]
+ assert [x.dag_id for x in d1.consuming_dags] == [dag_id1]
+ assert [(x.task_id, x.dag_id) for x in d1.producing_tasks] ==
[(task_id, dag_id2)]
assert set(
session.query(DatasetTaskRef.task_id, DatasetTaskRef.dag_id,
DatasetTaskRef.dataset_id)
.filter(DatasetTaskRef.dag_id.in_((dag_id1, dag_id2)))