gopidesupavan commented on code in PR #62754:
URL: https://github.com/apache/airflow/pull/62754#discussion_r2875635652
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py:
##########
@@ -16,98 +16,135 @@
# under the License.
from __future__ import annotations
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
-from airflow.providers.common.sql.config import FormatType
+from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+from airflow.providers.common.sql.config import DataSourceConfig, FormatType
from airflow.providers.common.sql.datafusion.base import FormatHandler
-from airflow.providers.common.sql.datafusion.exceptions import
FileFormatRegistrationException
+from airflow.providers.common.sql.datafusion.exceptions import (
+ FileFormatRegistrationException,
+ IcebergRegistrationException,
+)
if TYPE_CHECKING:
from datafusion import SessionContext
class ParquetFormatHandler(FormatHandler):
- """
- Parquet format handler.
-
- :param options: Additional options for the Parquet format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_parquet
- """
-
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ """Parquet format handler."""
@property
def get_format(self) -> FormatType:
"""Return the format type."""
return FormatType.PARQUET
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str):
+ def register_data_source_format(self, ctx: SessionContext):
"""Register a data source format."""
try:
- ctx.register_parquet(table_name, path, **self.options)
+ ctx.register_parquet(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
except Exception as e:
raise FileFormatRegistrationException(f"Failed to register Parquet
data source: {e}")
class CsvFormatHandler(FormatHandler):
- """
- CSV format handler.
-
- :param options: Additional options for the CSV format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_csv
- """
-
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ """CSV format handler."""
@property
def get_format(self) -> FormatType:
"""Return the format type."""
return FormatType.CSV
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str):
+ def register_data_source_format(self, ctx: SessionContext):
"""Register a data source format."""
try:
- ctx.register_csv(table_name, path, **self.options)
+ ctx.register_csv(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
except Exception as e:
raise FileFormatRegistrationException(f"Failed to register csv
data source: {e}")
class AvroFormatHandler(FormatHandler):
- """
- Avro format handler.
-
- :param options: Additional options for the Avro format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_avro
- """
-
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ """Avro format handler."""
@property
def get_format(self) -> FormatType:
"""Return the format type."""
return FormatType.AVRO
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str) -> None:
+ def register_data_source_format(self, ctx: SessionContext) -> None:
"""Register a data source format."""
try:
- ctx.register_avro(table_name, path, **self.options)
+ ctx.register_avro(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
except Exception as e:
raise FileFormatRegistrationException(f"Failed to register Avro
data source: {e}")
-def get_format_handler(format_type: str, options: dict[str, Any] | None =
None) -> FormatHandler:
+class IcebergFormatHandler(FormatHandler):
+ """
+ Iceberg format handler for DataFusion.
+
+ Loads an Iceberg table from a catalog using ``IcebergHook`` and registers
+ it with a DataFusion ``SessionContext`` via ``register_table_provider``.
+ """
+
+ @property
+ def get_format(self) -> FormatType:
+ """Return the format type."""
+ return FormatType.ICEBERG
+
+ def register_data_source_format(self, ctx: SessionContext) -> None:
+ """Register an Iceberg table with the DataFusion session context."""
+ try:
+ from airflow.providers.apache.iceberg.hooks.iceberg import
IcebergHook
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "Iceberg format requires the
apache-airflow-providers-apache-iceberg package. "
+ "Install it with: pip install
'apache-airflow-providers-apache-iceberg'"
+ )
+
+ try:
+ hook = IcebergHook(iceberg_conn_id=self.datasource_config.conn_id)
+ namespace_table =
f"{self.datasource_config.db_name}.{self.datasource_config.table_name}"
Review Comment:
thats good catch let me update :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]