jedcunningham commented on code in PR #37101:
URL: https://github.com/apache/airflow/pull/37101#discussion_r1499600781


##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:

Review Comment:
   ```suggestion
       ):
   ```
   
   nit



##########
airflow/example_dags/example_datasets.py:
##########
@@ -15,26 +15,40 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example DAG for demonstrating behavior of Datasets feature.
+Example DAG for demonstrating the behavior of the Datasets feature in Airflow, 
including conditional and
+dataset expression-based scheduling.
 
 Notes on usage:
 
-Turn on all the dags.
+Turn on all the DAGs.
 
-DAG dataset_produces_1 should run because it's on a schedule.
+dataset_produces_1 is scheduled to run daily. Once it completes, it triggers 
several DAGs due to its dataset
+being updated. dataset_consumes_1 is triggered immediately, as it depends 
solely on the dataset produced by
+dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be 
triggered, as its condition of
+either dataset_produces_1 or dataset_produces_2 being updated is satisfied 
with dataset_produces_1.
 
-After dataset_produces_1 runs, dataset_consumes_1 should be triggered 
immediately
-because its only dataset dependency is managed by dataset_produces_1.
+dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs 
because it requires the dataset
+from dataset_produces_2, which has no schedule and must be manually triggered.
 
-No other dags should be triggered.  Note that even though 
dataset_consumes_1_and_2 depends on
-the dataset in dataset_produces_1, it will not be triggered until 
dataset_produces_2 runs
-(and dataset_produces_2 is left with no schedule so that we can trigger it 
manually).
+After manually triggering dataset_produces_2, several DAGs will be affected. 
dataset_consumes_1_and_2 should
+run because both its dataset dependencies are now met. 
consume_1_and_2_with_dataset_expressions will be
+triggered, as it requires both dataset_produces_1 and dataset_produces_2 
datasets to be updated.
+consume_1_or_2_with_dataset_expressions will be triggered again, since it's 
conditionally set to run when
+either dataset is updated.
 
-Next, trigger dataset_produces_2.  After dataset_produces_2 finishes,
-dataset_consumes_1_and_2 should run.
+consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex 
dataset dependency logic.
+This DAG triggers if dataset_produces_1 is updated or if both 
dataset_produces_2 and dag3_dataset
+are updated. This example highlights the capability to combine updates from 
multiple datasets with logical
+expressions for advanced scheduling.
 
-Dags dataset_consumes_1_never_scheduled and 
dataset_consumes_unknown_never_scheduled should not run because
-they depend on datasets that never get updated.
+conditional_dataset_and_time_based_timetable illustrates the integration of 
time-based scheduling with
+dataset dependencies. This DAG is configured to execute either when both 
dataset_produces_1 and
+dataset_produces_2 datasets have been updated or according to a specific cron 
schedule, showcasing
+Airflow's versatility in handling mixed triggers for dataset and time-based 
scheduling.
+

Review Comment:
   ```suggestion
   ```
   
   nit: extra newline



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Complex Conditional Logic**
+
+For scenarios requiring more intricate conditions, such as triggering a DAG 
when one dataset is updated or when both of two other datasets are updated, 
combine the OR and AND operators:
+
+.. code-block:: python
+
+    dag3_dataset = Dataset("s3://dag3/output_3.txt")
+
+    with DAG(
+        dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )

Review Comment:
   ```suggestion
           ...
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",

Review Comment:
   ```suggestion
           # Consume dataset 1 and 2 with dataset expressions
   ```
   
   A comment is probably even better than an example DAG id.



##########
docs/apache-airflow/authoring-and-scheduling/timetable.rst:
##########
@@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
 
 In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC 
based on the ``CronTriggerTimetable``, and it is also triggered by updates to 
``dag1_dataset``.
 
-Future Enhancements
-~~~~~~~~~~~~~~~~~~~
-Future iterations may introduce more complex combinations for scheduling 
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility 
for scheduling DAGs in various scenarios.
+Integrate conditional dataset with Time-Based Scheduling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Combining conditional dataset expressions with time-based schedules enhances 
scheduling flexibility:
+
+.. code-block:: python
+
+    from airflow.timetables import DatasetOrTimeSchedule
+    from airflow.timetables.trigger import CronTriggerTimetable
+
+    with DAG(
+        dag_id="conditional_dataset_and_time_based_timetable",

Review Comment:
   ```suggestion
           # Conditional dataset and time based timetable
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+

Review Comment:
   My 2c, these don't have to be fully functional. I think it's easier if we 
just have the parts that are critical to the topic at hand.



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(*self.objects, other)
+        return NotImplemented
+
+    def __and__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAll(self, other)
+        return NotImplemented
+
+    def __repr__(self) -> str:
+        return f"DatasetAny({', '.join(map(str, self.objects))})"
+
 
 class DatasetAll(_DatasetBooleanCondition):
     """Use to combine datasets schedule references in an "or" relationship."""
 
     agg_func = all
+
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll):
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):

Review Comment:
   ```suggestion
       def __or__(self, other: Dataset | DatasetAny | DatasetAll) -> DatasetAny:
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )

Review Comment:
   ```suggestion
           ...
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(*self.objects, other)
+        return NotImplemented
+
+    def __and__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAll(self, other)
+        return NotImplemented
+
+    def __repr__(self) -> str:
+        return f"DatasetAny({', '.join(map(str, self.objects))})"
+
 
 class DatasetAll(_DatasetBooleanCondition):
     """Use to combine datasets schedule references in an "or" relationship."""
 
     agg_func = all
+
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll):
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(self, other)
+        return NotImplemented
+
+    def __and__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAll(*self.objects, other)
+        return NotImplemented
+
+    def __repr__(self) -> str:
+        return f"DatasetAll({', '.join(map(str, self.objects))})"
+
+
+class DatasetsExpression:
+    """
+    Represents a node in an expression tree for dataset conditions.
+
+    :param value: The value of the node, which can be a 'Dataset', '&', or '|'.
+    :param left: The left child node.
+    :param right: The right child node.
+    """
+
+    def __init__(self, value, left=None, right=None) -> None:
+        self.value = value  # value can be 'Dataset', '&', or '|'
+        self.left = left
+        self.right = right
+
+    def __or__(self, other: Dataset | DatasetsExpression) -> 
DatasetsExpression:
+        return DatasetsExpression("|", self, other)
+
+    def __and__(self, other: Dataset | DatasetsExpression) -> 
DatasetsExpression:
+        return DatasetsExpression("&", self, other)
+
+    def __repr__(self) -> str:
+        if isinstance(self.value, Dataset):
+            return f"Dataset(uri='{self.value.uri}')"
+        elif self.value == "&":
+            return repr(DatasetAll(self.left, self.right))
+        elif self.value == "|":
+            return repr(DatasetAny(self.left, self.right))
+        else:
+            return f"Invalid DatasetsExpression(value={self.value})"
+
+
+def extract_datasets(
+    dataset_expression: DatasetsExpression | Dataset,
+) -> BaseDatasetEventInput:
+    """
+    Extract datasets from the given DatasetsExpression.
+
+    :param dataset_expression: The DatasetsExpression to extract from.
+    """
+    if isinstance(dataset_expression, DatasetsExpression):
+        if dataset_expression.value == "&":
+            return DatasetAll(dataset_expression.left, 
dataset_expression.right)
+        elif dataset_expression.value == "|":
+            return DatasetAny(dataset_expression.left, 
dataset_expression.right)
+        raise ValueError("Invalid Expression node value")
+    return dataset_expression

Review Comment:
   ```suggestion
       if not isinstance(dataset_expression, DatasetsExpression):
           return dataset_expression
   
       if dataset_expression.value == "&":
           return DatasetAll(dataset_expression.left, dataset_expression.right)
       elif dataset_expression.value == "|":
           return DatasetAny(dataset_expression.left, dataset_expression.right)
       raise ValueError("Invalid Expression node value")
   ```
   
   Bikeshedding a bit, but this removes some nesting.



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),

Review Comment:
   ```suggestion
           schedule=(dag1_dataset & dag2_dataset),
           ...
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),

Review Comment:
   ```suggestion
           schedule=(dag1_dataset | dag2_dataset),
           ...
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:

Review Comment:
   ```suggestion
       ):
   ```
   
   nit



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Complex Conditional Logic**
+
+For scenarios requiring more intricate conditions, such as triggering a DAG 
when one dataset is updated or when both of two other datasets are updated, 
combine the OR and AND operators:
+
+.. code-block:: python
+
+    dag3_dataset = Dataset("s3://dag3/output_3.txt")
+
+    with DAG(
+        dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
+    ) as dag:

Review Comment:
   ```suggestion
       ):
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )

Review Comment:
   ```suggestion
           ...
   ```
   
   This is just the consumer, we don't need the downstream tasks.



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Complex Conditional Logic**
+
+For scenarios requiring more intricate conditions, such as triggering a DAG 
when one dataset is updated or when both of two other datasets are updated, 
combine the OR and AND operators:
+
+.. code-block:: python
+
+    dag3_dataset = Dataset("s3://dag3/output_3.txt")
+
+    with DAG(
+        dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),

Review Comment:
   ```suggestion
           schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
           ...
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(*self.objects, other)
+        return NotImplemented
+
+    def __and__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAll(self, other)
+        return NotImplemented
+
+    def __repr__(self) -> str:
+        return f"DatasetAny({', '.join(map(str, self.objects))})"
+
 
 class DatasetAll(_DatasetBooleanCondition):
     """Use to combine datasets schedule references in an "or" relationship."""
 
     agg_func = all
+
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll):
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(self, other)
+        return NotImplemented
+
+    def __and__(self, other):

Review Comment:
   ```suggestion
       def __and__(self, other: Dataset  | DatasetAny | DatasetAll) -> 
DatasetAll:
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -245,3 +324,5 @@ DatasetTimetable Integration
 With the introduction of ``DatasetTimetable``, it is now possible to schedule 
DAGs based on both dataset events and time-based schedules. This feature offers 
flexibility for scenarios where a DAG needs to be triggered by data updates as 
well as run periodically according to a fixed timetable.
 
 For more detailed information on ``DatasetTimetable`` and its usage, refer to 
the corresponding section in :ref:`DatasetTimetable 
<dataset-timetable-section>`.
+
+These examples illustrate how Airflow's conditional dataset expressions can be 
used to create complex, data-dependent scheduling scenarios, providing precise 
control over when DAGs are triggered in response to data updates.

Review Comment:
   ```suggestion
   These examples illustrate how Airflow's conditional dataset expressions can 
be used to create complex data-dependent scheduling scenarios, providing 
precise control over when DAGs are triggered in response to data updates.
   ```



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset | dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_or_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Complex Conditional Logic**
+
+For scenarios requiring more intricate conditions, such as triggering a DAG 
when one dataset is updated or when both of two other datasets are updated, 
combine the OR and AND operators:
+
+.. code-block:: python
+
+    dag3_dataset = Dataset("s3://dag3/output_3.txt")
+
+    with DAG(
+        dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",

Review Comment:
   ```suggestion
           # Consume dataset 1 or both 2 and 3 with dataset expressions
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -73,6 +73,12 @@ def __eq__(self, other):
     def __hash__(self):
         return hash(self.uri)
 
+    def __or__(self, other: Dataset):

Review Comment:
   ```suggestion
       def __or__(self, other: Dataset | DatasetAny | DatasetAll) -> DatasetAny:
   ```
   
   Can't other be anything?



##########
docs/apache-airflow/authoring-and-scheduling/datasets.rst:
##########
@@ -237,6 +237,85 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one Dataset given to the DAG, and the first of one 
DatasetEvent for that Dataset. An implementation may be quite complex if you 
have multiple Datasets, potentially with multiple DatasetEvents.
 
+Advanced Dataset Scheduling with Conditional Expressions
+--------------------------------------------------------
+
+Apache Airflow introduces advanced scheduling capabilities that leverage 
conditional expressions with datasets. This feature allows Airflow users to 
define complex dependencies for DAG executions based on dataset updates, using 
logical operators for more granular control over workflow triggers.
+
+Logical Operators for Datasets
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow supports two logical operators for combining dataset conditions:
+
+- **AND (``&``)**: Specifies that the DAG should be triggered only after all 
of the specified datasets have been updated.
+- **OR (``|``)**: Specifies that the DAG should be triggered when any one of 
the specified datasets is updated.
+
+These operators enable the expression of complex dataset update conditions, 
enhancing the dynamism and flexibility of Airflow workflows.
+
+Example Usage
+-------------
+
+**Scheduling Based on Multiple Dataset Updates**
+
+To schedule a DAG to run only when two specific datasets have both been 
updated, use the AND operator (``&``):
+
+.. code-block:: python
+
+    from airflow.models import DAG
+    from airflow.operators.bash import BashOperator
+    from airflow.datasets import Dataset
+    import pendulum
+
+    dag1_dataset = Dataset("s3://dag1/output_1.txt")
+    dag2_dataset = Dataset("s3://dag2/output_1.txt")
+
+    with DAG(
+        dag_id="consume_1_and_2_with_dataset_expressions",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=(dag1_dataset & dag2_dataset),
+    ) as dag:
+        BashOperator(
+            task_id="consume_1_and_2_with_dataset_expressions",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
+        )
+
+**Scheduling Based on Any Dataset Update**
+
+To trigger a DAG execution when either of two datasets is updated, apply the 
OR operator (``|``):
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="consume_1_or_2_with_dataset_expressions",

Review Comment:
   ```suggestion
           # Consume dataset 1 or 2 with dataset expressions
   ```



##########
docs/apache-airflow/authoring-and-scheduling/timetable.rst:
##########
@@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
 
 In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC 
based on the ``CronTriggerTimetable``, and it is also triggered by updates to 
``dag1_dataset``.
 
-Future Enhancements
-~~~~~~~~~~~~~~~~~~~
-Future iterations may introduce more complex combinations for scheduling 
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility 
for scheduling DAGs in various scenarios.
+Integrate conditional dataset with Time-Based Scheduling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Combining conditional dataset expressions with time-based schedules enhances 
scheduling flexibility:
+
+.. code-block:: python
+
+    from airflow.timetables import DatasetOrTimeSchedule
+    from airflow.timetables.trigger import CronTriggerTimetable
+
+    with DAG(
+        dag_id="conditional_dataset_and_time_based_timetable",
+        catchup=False,
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=DatasetOrTimeSchedule(
+            timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), 
datasets=(dag1_dataset & dag2_dataset)
+        ),
+        tags=["dataset-time-based-timetable"],

Review Comment:
   ```suggestion
           schedule=DatasetOrTimeSchedule(
               timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), 
datasets=(dag1_dataset & dag2_dataset)
           ),
           ...
   ```



##########
docs/apache-airflow/authoring-and-scheduling/timetable.rst:
##########
@@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
 
 In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC 
based on the ``CronTriggerTimetable``, and it is also triggered by updates to 
``dag1_dataset``.
 
-Future Enhancements
-~~~~~~~~~~~~~~~~~~~
-Future iterations may introduce more complex combinations for scheduling 
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility 
for scheduling DAGs in various scenarios.
+Integrate conditional dataset with Time-Based Scheduling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Combining conditional dataset expressions with time-based schedules enhances 
scheduling flexibility:
+
+.. code-block:: python
+
+    from airflow.timetables import DatasetOrTimeSchedule
+    from airflow.timetables.trigger import CronTriggerTimetable
+
+    with DAG(

Review Comment:
   The rest of the examples on this page use taskflow, we should probably 
convert to that style too.



##########
docs/apache-airflow/authoring-and-scheduling/timetable.rst:
##########
@@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
 
 In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC 
based on the ``CronTriggerTimetable``, and it is also triggered by updates to 
``dag1_dataset``.
 
-Future Enhancements
-~~~~~~~~~~~~~~~~~~~
-Future iterations may introduce more complex combinations for scheduling 
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility 
for scheduling DAGs in various scenarios.
+Integrate conditional dataset with Time-Based Scheduling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Combining conditional dataset expressions with time-based schedules enhances 
scheduling flexibility:
+
+.. code-block:: python
+
+    from airflow.timetables import DatasetOrTimeSchedule
+    from airflow.timetables.trigger import CronTriggerTimetable
+
+    with DAG(
+        dag_id="conditional_dataset_and_time_based_timetable",
+        catchup=False,
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=DatasetOrTimeSchedule(
+            timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), 
datasets=(dag1_dataset & dag2_dataset)
+        ),
+        tags=["dataset-time-based-timetable"],
+    ) as dag:
+        BashOperator(
+            task_id="conditional_dataset_and_time_based_timetable",
+            bash_command="sleep 5",
+            
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
+        )

Review Comment:
   ```suggestion
           ...
   ```



##########
tests/datasets/test_dataset.py:
##########
@@ -269,3 +269,69 @@ def test_dag_with_complex_dataset_triggers(session, 
dag_maker):
     assert isinstance(
         serialized_dag_dict["dataset_triggers"], dict
     ), "Serialized 'dataset_triggers' should be a dict"
+
+
+def datasets_equal(d1, d2):
+    if type(d1) != type(d2):
+        return False
+
+    if isinstance(d1, Dataset):
+        return d1.uri == d2.uri
+
+    elif isinstance(d1, (DatasetAny, DatasetAll)):
+        if len(d1.objects) != len(d2.objects):
+            return False
+
+        # Compare each pair of objects
+        for obj1, obj2 in zip(d1.objects, d2.objects):
+            # If obj1 or obj2 is a Dataset, DatasetAny, or DatasetAll instance,
+            # recursively call datasets_equal
+            if not datasets_equal(obj1, obj2):
+                return False
+        return True
+
+    return False
+
+
+dataset1 = Dataset(uri="s3://bucket1/data1")
+dataset2 = Dataset(uri="s3://bucket2/data2")
+dataset3 = Dataset(uri="s3://bucket3/data3")
+dataset4 = Dataset(uri="s3://bucket4/data4")
+dataset5 = Dataset(uri="s3://bucket5/data5")
+
+test_cases = [
+    (lambda: dataset1, dataset1),
+    (lambda: dataset1 & dataset2, DatasetAll(dataset1, dataset2)),
+    (lambda: dataset1 | dataset2, DatasetAny(dataset1, dataset2)),
+    (lambda: dataset1 | (dataset2 & dataset3), DatasetAny(dataset1, 
DatasetAll(dataset2, dataset3))),
+    (lambda: dataset1 | dataset2 & dataset3, DatasetAny(dataset1, 
DatasetAll(dataset2, dataset3))),
+    (
+        lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5),
+        DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), 
DatasetAny(dataset4, dataset5)),
+    ),
+    (lambda: dataset1 & dataset2 | dataset3, DatasetAny(DatasetAll(dataset1, 
dataset2), dataset3)),
+    (
+        lambda: (dataset1 | dataset2) & (dataset3 | dataset4),
+        DatasetAll(DatasetAny(dataset1, dataset2), DatasetAny(dataset3, 
dataset4)),
+    ),
+    (
+        lambda: (dataset1 & dataset2) | (dataset3 & (dataset4 | dataset5)),
+        DatasetAny(DatasetAll(dataset1, dataset2), DatasetAll(dataset3, 
DatasetAny(dataset4, dataset5))),
+    ),
+    (
+        lambda: (dataset1 & dataset2) & (dataset3 & dataset4),
+        DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)),
+    ),
+    (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, 
dataset3)),

Review Comment:
   ```suggestion
       (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, 
dataset3)),
       (lambda: dataset1 & dataset2 & dataset3, DatasetAll(dataset1, dataset2, 
dataset3)),
   ```
   
   Probably worth having coverage for and also.



##########
airflow/datasets/__init__.py:
##########
@@ -73,6 +73,12 @@ def __eq__(self, other):
     def __hash__(self):
         return hash(self.uri)
 
+    def __or__(self, other: Dataset):
+        return DatasetAny(self, other)
+
+    def __and__(self, other: Dataset):

Review Comment:
   ```suggestion
       def __and__(self, other: Dataset | DatasetAny | DatasetAll) -> 
DatasetAll:
   ```



##########
docs/apache-airflow/authoring-and-scheduling/timetable.rst:
##########
@@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
 
 In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC 
based on the ``CronTriggerTimetable``, and it is also triggered by updates to 
``dag1_dataset``.
 
-Future Enhancements
-~~~~~~~~~~~~~~~~~~~
-Future iterations may introduce more complex combinations for scheduling 
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility 
for scheduling DAGs in various scenarios.
+Integrate conditional dataset with Time-Based Scheduling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Combining conditional dataset expressions with time-based schedules enhances 
scheduling flexibility:
+
+.. code-block:: python
+
+    from airflow.timetables import DatasetOrTimeSchedule
+    from airflow.timetables.trigger import CronTriggerTimetable
+
+    with DAG(
+        dag_id="conditional_dataset_and_time_based_timetable",
+        catchup=False,
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        schedule=DatasetOrTimeSchedule(
+            timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), 
datasets=(dag1_dataset & dag2_dataset)
+        ),
+        tags=["dataset-time-based-timetable"],
+    ) as dag:

Review Comment:
   ```suggestion
       ):
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(*self.objects, other)
+        return NotImplemented
+
+    def __and__(self, other):

Review Comment:
   ```suggestion
       def __and__(self, other: Dataset | DatasetAny | DatasetAll) -> 
DatasetAll:
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):

Review Comment:
   ```suggestion
       def __or__(self, other: Dataset | DatasetAny | DatasetAll) -> DatasetAny:
   ```



##########
airflow/datasets/__init__.py:
##########
@@ -106,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
+    def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None:
+        """Initialize with one or more Dataset, DatasetAny, or DatasetAll 
instances."""
+        super().__init__(*objects)
+
+    def __or__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAny(*self.objects, other)
+        return NotImplemented
+
+    def __and__(self, other):
+        if isinstance(other, (Dataset, DatasetAny, DatasetAll)):
+            return DatasetAll(self, other)

Review Comment:
   Why is `or` using `*self.objects`, but this is using `self`? (Maybe that 
missing test will highlight this?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to