This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dceaada15 Renamed up/downstream references to producing/consuming 
(#25688)
8dceaada15 is described below

commit 8dceaada15f7a368ac97809822df876689e42954
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Aug 12 15:54:48 2022 +0100

    Renamed up/downstream references to producing/consuming (#25688)
    
    We changed the UI to use these new terms already, but the code wasn't
    changed. These names are clearer to me (and others) so I have made the
    code reflect them too.
---
 airflow/api_connexion/endpoints/dataset_endpoint.py    |  6 ++----
 airflow/api_connexion/openapi/v1.yaml                  |  4 ++--
 airflow/api_connexion/schemas/dataset_schema.py        |  4 ++--
 airflow/models/dataset.py                              |  4 ++--
 airflow/models/taskinstance.py                         |  6 +++---
 airflow/www/static/js/datasets/Details.tsx             | 12 ++++++------
 airflow/www/static/js/datasets/List.tsx                |  4 ++--
 airflow/www/static/js/types/api-generated.ts           |  4 ++--
 tests/api_connexion/endpoints/test_dataset_endpoint.py | 12 ++++++------
 tests/api_connexion/schemas/test_dataset_schema.py     | 12 ++++++------
 tests/models/test_dag.py                               |  4 ++--
 11 files changed, 35 insertions(+), 37 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py 
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index b6d4ac0ca5..dbfafec6e3 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -42,7 +42,7 @@ def get_dataset(id: int, session: Session = NEW_SESSION) -> 
APIResponse:
     """Get a Dataset"""
     dataset = (
         session.query(Dataset)
-        .options(joinedload(Dataset.downstream_dag_references), 
joinedload(Dataset.upstream_task_references))
+        .options(joinedload(Dataset.consuming_dags), 
joinedload(Dataset.producing_tasks))
         .get(id)
     )
     if not dataset:
@@ -73,9 +73,7 @@ def get_datasets(
         query = query.filter(Dataset.uri.ilike(f"%{uri_pattern}%"))
     query = apply_sorting(query, order_by, {}, allowed_attrs)
     datasets = (
-        query.options(
-            subqueryload(Dataset.downstream_dag_references), 
subqueryload(Dataset.upstream_task_references)
-        )
+        query.options(subqueryload(Dataset.consuming_dags), 
subqueryload(Dataset.producing_tasks))
         .offset(offset)
         .limit(limit)
         .all()
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 74baeaa122..1056d63a74 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3519,11 +3519,11 @@ components:
           type: string
           description: The dataset update time
           nullable: false
-        downstream_dag_references:
+        consuming_dags:
             type: array
             items:
               $ref: '#/components/schemas/DatasetDagRef'
-        upstream_task_references:
+        producing_tasks:
             type: array
             items:
               $ref: '#/components/schemas/DatasetTaskRef'
diff --git a/airflow/api_connexion/schemas/dataset_schema.py 
b/airflow/api_connexion/schemas/dataset_schema.py
index 7ff3e638e1..ffb8b88e4b 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -64,8 +64,8 @@ class DatasetSchema(SQLAlchemySchema):
     extra = JsonObjectField()
     created_at = auto_field()
     updated_at = auto_field()
-    upstream_task_references = fields.List(fields.Nested(DatasetTaskRefSchema))
-    downstream_dag_references = fields.List(fields.Nested(DatasetDagRefSchema))
+    producing_tasks = fields.List(fields.Nested(DatasetTaskRefSchema))
+    consuming_dags = fields.List(fields.Nested(DatasetDagRefSchema))
 
 
 class DatasetCollection(NamedTuple):
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index c1dfd866c5..bec94b5064 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -52,8 +52,8 @@ class Dataset(Base):
     created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)
 
-    downstream_dag_references = relationship("DatasetDagRef", 
back_populates="dataset")
-    upstream_task_references = relationship("DatasetTaskRef", 
back_populates="dataset")
+    consuming_dags = relationship("DatasetDagRef", back_populates="dataset")
+    producing_tasks = relationship("DatasetTaskRef", back_populates="dataset")
 
     __tablename__ = "dataset"
     __table_args__ = (
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 502c863876..7c9cac7f24 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1538,8 +1538,8 @@ class TaskInstance(Base, LoggingMixin):
                 if not dataset:
                     self.log.warning("Dataset %s not found", obj)
                     continue
-                downstream_dag_ids = [x.dag_id for x in 
dataset.downstream_dag_references]
-                self.log.debug("downstream dag ids %s", downstream_dag_ids)
+                consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
+                self.log.debug("consuming dag ids %s", consuming_dag_ids)
                 session.add(
                     DatasetEvent(
                         dataset_id=dataset.id,
@@ -1549,7 +1549,7 @@ class TaskInstance(Base, LoggingMixin):
                         source_map_index=self.map_index,
                     )
                 )
-                for dag_id in downstream_dag_ids:
+                for dag_id in consuming_dag_ids:
                     session.merge(DatasetDagRunQueue(dataset_id=dataset.id, 
target_dag_id=dag_id))
 
     def _execute_task_with_callbacks(self, context, test_mode=False):
diff --git a/airflow/www/static/js/datasets/Details.tsx 
b/airflow/www/static/js/datasets/Details.tsx
index c412c2b12e..d871fe317f 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -43,8 +43,8 @@ const gridUrl = getMetaValue('grid_url');
 const Details = ({
   dataset: {
     uri,
-    upstreamTaskReferences,
-    downstreamDagReferences,
+    producingTasks,
+    consumingDags,
   },
 }: { dataset: API.Dataset }) => (
   <Box>
@@ -54,13 +54,13 @@ const Details = ({
       {uri}
       <ClipboardButton value={uri} iconOnly ml={2} />
     </Heading>
-    {upstreamTaskReferences && !!upstreamTaskReferences.length && (
+    {producingTasks && !!producingTasks.length && (
     <Box mb={2}>
       <Flex alignItems="center">
         <Heading size="md" fontWeight="normal">Producing Tasks</Heading>
         <InfoTooltip label="Tasks that will update this dataset." size={14} />
       </Flex>
-      {upstreamTaskReferences.map(({ dagId, taskId }) => (
+      {producingTasks.map(({ dagId, taskId }) => (
         <Link
           key={`${dagId}.${taskId}`}
           color="blue.600"
@@ -72,13 +72,13 @@ const Details = ({
       ))}
     </Box>
     )}
-    {downstreamDagReferences && !!downstreamDagReferences.length && (
+    {consumingDags && !!consumingDags.length && (
     <Box>
       <Flex alignItems="center">
         <Heading size="md" fontWeight="normal">Consuming DAGs</Heading>
         <InfoTooltip label="DAGs that depend on this dataset updating to 
trigger a run." size={14} />
       </Flex>
-      {downstreamDagReferences.map(({ dagId }) => (
+      {consumingDags.map(({ dagId }) => (
         <Link
           key={dagId}
           color="blue.600"
diff --git a/airflow/www/static/js/datasets/List.tsx 
b/airflow/www/static/js/datasets/List.tsx
index 79fe4e5442..4765baa02d 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -72,13 +72,13 @@ const DatasetsList = ({ onSelect }: Props) => {
       },
       {
         Header: UpstreamHeader,
-        accessor: 'upstreamTaskReferences',
+        accessor: 'producingTasks',
         Cell: ({ cell: { value } }: any) => value.length,
         disableSortBy: true,
       },
       {
         Header: DownstreamHeader,
-        accessor: 'downstreamDagReferences',
+        accessor: 'consumingDags',
         Cell: ({ cell: { value } }: any) => value.length,
         disableSortBy: true,
       },
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 0b016f430f..8df89497f7 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1477,8 +1477,8 @@ export interface components {
       created_at?: string;
       /** @description The dataset update time */
       updated_at?: string;
-      downstream_dag_references?: components["schemas"]["DatasetDagRef"][];
-      upstream_task_references?: components["schemas"]["DatasetTaskRef"][];
+      consuming_dags?: components["schemas"]["DatasetDagRef"][];
+      producing_tasks?: components["schemas"]["DatasetTaskRef"][];
     };
     /**
      * @description A datasets reference to an upstream task.
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py 
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 6e71592785..3343b8d208 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -89,8 +89,8 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
             "extra": {'foo': 'bar'},
             "created_at": self.default_time,
             "updated_at": self.default_time,
-            "downstream_dag_references": [],
-            "upstream_task_references": [],
+            "consuming_dags": [],
+            "producing_tasks": [],
         }
 
     def test_should_respond_404(self):
@@ -138,8 +138,8 @@ class TestGetDatasets(TestDatasetEndpoint):
                     "extra": {'foo': 'bar'},
                     "created_at": self.default_time,
                     "updated_at": self.default_time,
-                    "downstream_dag_references": [],
-                    "upstream_task_references": [],
+                    "consuming_dags": [],
+                    "producing_tasks": [],
                 },
                 {
                     "id": 2,
@@ -147,8 +147,8 @@ class TestGetDatasets(TestDatasetEndpoint):
                     "extra": {'foo': 'bar'},
                     "created_at": self.default_time,
                     "updated_at": self.default_time,
-                    "downstream_dag_references": [],
-                    "upstream_task_references": [],
+                    "consuming_dags": [],
+                    "producing_tasks": [],
                 },
             ],
             "total_entries": 2,
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py 
b/tests/api_connexion/schemas/test_dataset_schema.py
index 9703e2f0c3..6f36718300 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -67,14 +67,14 @@ class TestDatasetSchema(TestDatasetSchemaBase):
             "extra": {'foo': 'bar'},
             "created_at": self.timestamp,
             "updated_at": self.timestamp,
-            "downstream_dag_references": [
+            "consuming_dags": [
                 {
                     "dag_id": "test_dataset_downstream_schema",
                     "created_at": self.timestamp,
                     "updated_at": self.timestamp,
                 }
             ],
-            "upstream_task_references": [
+            "producing_tasks": [
                 {
                     "task_id": "task1",
                     "dag_id": "test_dataset_upstream_schema",
@@ -110,8 +110,8 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
                     "extra": {'foo': 'bar'},
                     "created_at": self.timestamp,
                     "updated_at": self.timestamp,
-                    "downstream_dag_references": [],
-                    "upstream_task_references": [],
+                    "consuming_dags": [],
+                    "producing_tasks": [],
                 },
                 {
                     "id": 2,
@@ -119,8 +119,8 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
                     "extra": {'foo': 'bar'},
                     "created_at": self.timestamp,
                     "updated_at": self.timestamp,
-                    "downstream_dag_references": [],
-                    "upstream_task_references": [],
+                    "consuming_dags": [],
+                    "producing_tasks": [],
                 },
             ],
             "total_entries": 2,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 083b680f17..b9d812728c 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -841,8 +841,8 @@ class TestDag:
         d2 = stored_datasets[d2.uri]
         d3 = stored_datasets[d3.uri]
         assert stored_datasets[uri1].extra == {"should": "be used"}
-        assert [x.dag_id for x in d1.downstream_dag_references] == [dag_id1]
-        assert [(x.task_id, x.dag_id) for x in d1.upstream_task_references] == 
[(task_id, dag_id2)]
+        assert [x.dag_id for x in d1.consuming_dags] == [dag_id1]
+        assert [(x.task_id, x.dag_id) for x in d1.producing_tasks] == 
[(task_id, dag_id2)]
         assert set(
             session.query(DatasetTaskRef.task_id, DatasetTaskRef.dag_id, 
DatasetTaskRef.dataset_id)
             .filter(DatasetTaskRef.dag_id.in_((dag_id1, dag_id2)))

Reply via email to