jason810496 commented on code in PR #62232:
URL: https://github.com/apache/airflow/pull/62232#discussion_r2838857217


##########
providers/common/ai/docs/operators/analytics.rst:
##########
@@ -0,0 +1,82 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Analytics Operator
+===================
+
+The Analytics operator is designed to run analytic queries on data stored in 
various datastores. It is a generic operator that can query data in S3, GCS, 
Azure, and Local File System.
+
+The Analytics Operator uses Apache DataFusion as its query engine and supports 
SQL as the query language. It operates on a single node engine to deliver 
high-performance analytics on the data. It can be used for various analytics 
tasks such as data exploration, data aggregation, and more. 
`<https://datafusion.apache.org/>`_.
+
+
+When to Use Analytics Operator
+------------------------------
+
+Analytics Operator is suitable for running analytics on large volumes of 
datasets, with performance and efficiency. Under the hood, it uses Apache 
DataFusion, a high-performance, extensible query engine for Apache Arrow, which 
enables fast SQL queries on various data formats and storage systems. 
DataFusion is chosen for its ability to handle large-scale data processing on a 
single node, providing low-latency analytics without the need for a full 
database setup and without the need for high compute clusters. For more on 
Analytics Operator with DataFusion use cases, see 
`<https://datafusion.apache.org/user-guide/introduction.html#use-cases>`_.
+
+
+Supported Storage Systems
+-------------------------
+- S3
+- Local File System

Review Comment:
   > So here we are registering all those object stores that supports by 
datafusion. 
https://datafusion.apache.org/python/autoapi/datafusion/object_store/index.html 
And few table providers Delta and Iceberg. not yet implemented in this PR
   
   If this is the case, could we mention the 
https://datafusion.apache.org/python/autoapi/datafusion/object_store/index.html 
doc here and add note for users that we haven't supported some of the object 
store that datafusion already supported?
   
   Since we only mentioned `S3` and `Local File System` here, which might be 
confused for users.
   



##########
providers/common/ai/docs/operators/analytics.rst:
##########
@@ -0,0 +1,82 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Analytics Operator
+===================
+
+The Analytics operator is designed to run analytic queries on data stored in 
various datastores. It is a generic operator that can query data in S3, GCS, 
Azure, and Local File System.
+
+The Analytics Operator uses Apache DataFusion as its query engine and supports 
SQL as the query language. It operates on a single node engine to deliver 
high-performance analytics on the data. It can be used for various analytics 
tasks such as data exploration, data aggregation, and more. 
`<https://datafusion.apache.org/>`_.
+
+
+When to Use Analytics Operator
+------------------------------
+
+Analytics Operator is suitable for running analytics on large volumes of 
datasets, with performance and efficiency. Under the hood, it uses Apache 
DataFusion, a high-performance, extensible query engine for Apache Arrow, which 
enables fast SQL queries on various data formats and storage systems. 
DataFusion is chosen for its ability to handle large-scale data processing on a 
single node, providing low-latency analytics without the need for a full 
database setup and without the need for high compute clusters. For more on 
Analytics Operator with DataFusion use cases, see 
`<https://datafusion.apache.org/user-guide/introduction.html#use-cases>`_.

Review Comment:
   The content of the paragraph seems almost identical to the last paragraph.



##########
providers/common/ai/src/airflow/providers/common/ai/datafusion/engine.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+from datafusion import SessionContext
+
+from airflow.providers.common.ai.datafusion.format_handlers import 
get_format_handler
+from airflow.providers.common.ai.datafusion.object_storage_provider import 
ObjectStorageProviderFactory
+from airflow.providers.common.ai.exceptions import 
ObjectStoreCreationException, QueryExecutionException
+from airflow.providers.common.ai.utils.config import ConnectionConfig, 
DataSourceConfig
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DataFusionEngine(LoggingMixin):
+    """Apache DataFusion engine."""
+
+    def __init__(self):
+        super().__init__()
+        # TODO: session context has additional parameters via SessionConfig 
see what's possible we can use Possible via DataFusionHook ?
+        self.df_ctx = SessionContext()
+        self.registered_tables: dict[str, str] = {}
+
+    @cached_property
+    def session_context(self) -> SessionContext:
+        """Return the session context."""
+        return self.df_ctx

Review Comment:
   May I ask why do we need another `cache_property` for `SessionContext`? 
Since we aren't using Hook here, so the `cache_property` seems unnecessary for 
me.



##########
providers/common/ai/src/airflow/providers/common/ai/datafusion/engine.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+from datafusion import SessionContext
+
+from airflow.providers.common.ai.datafusion.format_handlers import 
get_format_handler
+from airflow.providers.common.ai.datafusion.object_storage_provider import 
ObjectStorageProviderFactory
+from airflow.providers.common.ai.exceptions import 
ObjectStoreCreationException, QueryExecutionException
+from airflow.providers.common.ai.utils.config import ConnectionConfig, 
DataSourceConfig
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DataFusionEngine(LoggingMixin):
+    """Apache DataFusion engine."""
+
+    def __init__(self):
+        super().__init__()
+        # TODO: session context has additional parameters via SessionConfig 
see what's possible we can use Possible via DataFusionHook ?

Review Comment:
   Does it means that for the long-term goal, we will separate DataFusion as a 
independent provider (since we need hook for DataFusion), and Common AI 
provider will depend on DataFusion provider?



##########
providers/common/ai/src/airflow/providers/common/ai/utils/config.py:
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+

Review Comment:
   Not sure will it be more straightforward for users if we move `config` 
module to parent module?
   
   ```python
   from airflow.providers.common.ai.utils.config import DataSourceConfig, 
FormatType
   from airflow.providers.common.ai.config import DataSourceConfig, FormatType
   ```



##########
providers/common/ai/src/airflow/providers/common/ai/utils/mixins.py:
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook

Review Comment:
   How about using lazy import `AwsGenericHook` instead of directly import? 
Since we mark `amazon` as optional dependency.



##########
providers/common/ai/src/airflow/providers/common/ai/operators/analytics.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from collections.abc import Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Literal
+
+from airflow.providers.common.ai.datafusion.engine import DataFusionEngine
+from airflow.providers.common.ai.utils.mixins import CommonAIHookMixin
+from airflow.sdk import BaseOperator, Context
+
+if TYPE_CHECKING:
+    from airflow.providers.common.ai.utils.config import DataSourceConfig
+
+
+class AnalyticsOperator(BaseOperator, CommonAIHookMixin):
+    """
+    Operator to run queries on various datasource's stored in object stores 
like S3, GCS, Azure, etc.
+
+    :param datasource_configs: List of datasource configurations to register.
+    :param queries: List of SQL queries to execute.
+    :param max_rows_check: Maximum number of rows allowed in query results. 
Queries exceeding this will be skipped.
+    :param engine: Optional DataFusion engine instance.
+    :param result_output_format: List of output formats for results. 
Supported: 'tabulate', 'json'. Default is 'tabulate'.
+    """
+
+    template_fields: Sequence[str] = (
+        "datasource_configs",
+        "queries",
+        "max_rows_check",
+        "result_output_format",
+    )
+
+    def __init__(
+        self,
+        datasource_configs: list[DataSourceConfig],
+        queries: list[str],
+        max_rows_check: int = 100,
+        engine: DataFusionEngine | None = None,
+        result_output_format: Literal["tabulate", "json"] = "tabulate",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.datasource_configs = datasource_configs
+        self.queries = queries
+        self.engine = engine
+        self.max_rows_check = max_rows_check
+        self.result_output_format = result_output_format
+
+    @cached_property
+    def _df_engine(self):
+        if self.engine is None:
+            return DataFusionEngine()
+        return self.engine
+
+    def execute(self, context: Context) -> Any:

Review Comment:
   ```suggestion
       def execute(self, context: Context) -> str:
   ```



##########
providers/common/ai/src/airflow/providers/common/ai/utils/config.py:
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any
+
+
+@dataclass(frozen=True)
+class ConnectionConfig:
+    """Configuration for datafusion object store connections."""
+
+    conn_id: str
+    credentials: dict[str, Any] = field(default_factory=dict)
+    extra_config: dict[str, Any] = field(default_factory=dict)
+
+
+class FormatType(str, Enum):
+    """Supported data formats."""
+
+    PARQUET = "parquet"
+    CSV = "csv"
+    JSON = "json"
+    AVRO = "avro"
+
+
+class StorageType(str, Enum):
+    """Storage types for Data Fusion."""
+
+    GCS = "gcs"
+    S3 = "s3"
+    AZURE = "azure"
+    LOCAL = "local"
+    HTTP = "http"
+
+
+@dataclass

Review Comment:
   ```suggestion
   @dataclass(frozen=True)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to