gopidesupavan commented on code in PR #62754:
URL: https://github.com/apache/airflow/pull/62754#discussion_r2875746353


##########
providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py:
##########
@@ -36,47 +40,138 @@ def session_context_mock(self):
         return MagicMock()
 
     def test_parquet_handler_success(self, session_context_mock):
-        handler = ParquetFormatHandler(options={"key": "value"})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+        datasource_config = DataSourceConfig(
+            table_name="table_name",
+            uri="file://path/to/file",
+            format="parquet",
+            conn_id="conn_id",
+            options={"key": "value"},
+        )
+        handler = ParquetFormatHandler(datasource_config)
+        handler.register_data_source_format(session_context_mock)
         session_context_mock.register_parquet.assert_called_once_with(
-            "table_name", "path/to/file", key="value"
+            "table_name", "file://path/to/file", key="value"
         )
         assert handler.get_format == FormatType.PARQUET
 
     def test_parquet_handler_failure(self, session_context_mock):
         session_context_mock.register_parquet.side_effect = Exception("Error")
-        handler = ParquetFormatHandler()
+        datasource_config = DataSourceConfig(
+            table_name="table_name", uri="file://path/to/file", 
format="parquet", conn_id="conn_id"
+        )
+        handler = ParquetFormatHandler(datasource_config)
         with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Parquet data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+            handler.register_data_source_format(session_context_mock)
 
     def test_csv_handler_success(self, session_context_mock):
-        handler = CsvFormatHandler(options={"delimiter": ","})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
-        
session_context_mock.register_csv.assert_called_once_with("table_name", 
"path/to/file", delimiter=",")
+        datasource_config = DataSourceConfig(
+            table_name="table_name",
+            uri="file://path/to/file",
+            format="csv",
+            conn_id="conn_id",
+            options={"delimiter": ","},
+        )
+        handler = CsvFormatHandler(datasource_config)
+        handler.register_data_source_format(session_context_mock)
+        session_context_mock.register_csv.assert_called_once_with(
+            "table_name", "file://path/to/file", delimiter=","
+        )
         assert handler.get_format == FormatType.CSV
 
     def test_csv_handler_failure(self, session_context_mock):
         session_context_mock.register_csv.side_effect = Exception("Error")
-        handler = CsvFormatHandler()
+        datasource_config = DataSourceConfig(
+            table_name="table_name", uri="file://path/to/file", format="csv", 
conn_id="conn_id"
+        )
+        handler = CsvFormatHandler(datasource_config)
         with pytest.raises(FileFormatRegistrationException, match="Failed to 
register csv data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+            handler.register_data_source_format(session_context_mock)
 
     def test_avro_handler_success(self, session_context_mock):
-        handler = AvroFormatHandler(options={"key": "value"})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
-        
session_context_mock.register_avro.assert_called_once_with("table_name", 
"path/to/file", key="value")
+        datasource_config = DataSourceConfig(
+            table_name="table_name",
+            uri="file://path/to/file",
+            format="avro",
+            conn_id="conn_id",
+            options={"key": "value"},
+        )
+        handler = AvroFormatHandler(datasource_config)
+        handler.register_data_source_format(session_context_mock)
+        session_context_mock.register_avro.assert_called_once_with(
+            "table_name", "file://path/to/file", key="value"
+        )
         assert handler.get_format == FormatType.AVRO
 
     def test_avro_handler_failure(self, session_context_mock):
         session_context_mock.register_avro.side_effect = Exception("Error")
-        handler = AvroFormatHandler()
+        datasource_config = DataSourceConfig(
+            table_name="table_name", uri="file://path/to/file", format="avro", 
conn_id="conn_id"
+        )
+        handler = AvroFormatHandler(datasource_config)
         with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Avro data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+            handler.register_data_source_format(session_context_mock)
+
+    @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+    def test_iceberg_handler_success(self, mock_iceberg_hook_cls, 
session_context_mock):
+        mock_hook = MagicMock()
+        mock_iceberg_hook_cls.return_value = mock_hook
+        mock_iceberg_table = MagicMock()
+        mock_iceberg_table.io.properties = {}
+        mock_hook.load_table.return_value = mock_iceberg_table
+        datasource_config = DataSourceConfig(
+            table_name="my_table",
+            format="iceberg",
+            conn_id="iceberg_default",
+            db_name="default",
+        )
+        handler = IcebergFormatHandler(datasource_config)
+        handler.register_data_source_format(session_context_mock)
+
+        
mock_iceberg_hook_cls.assert_called_once_with(iceberg_conn_id="iceberg_default")
+        mock_hook.load_table.assert_called_once_with("default.my_table")
+        
session_context_mock.register_table.assert_called_once_with("my_table", 
mock_iceberg_table)
+        assert handler.get_format == FormatType.ICEBERG
+
+    @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+    def test_iceberg_handler_failure(self, mock_iceberg_hook_cls, 
session_context_mock):
+        mock_hook = MagicMock()
+        mock_iceberg_hook_cls.return_value = mock_hook
+        mock_hook.load_table.side_effect = Exception("catalog error")
+        datasource_config = DataSourceConfig(
+            table_name="my_table", format="iceberg", 
conn_id="iceberg_default", db_name="default"
+        )
+        handler = IcebergFormatHandler(datasource_config)
+        with pytest.raises(IcebergRegistrationException, match="Failed to 
register Iceberg table"):
+            handler.register_data_source_format(session_context_mock)
+
+    def test_iceberg_handler_default_options(self):
+        datasource_config = DataSourceConfig(
+            table_name="my_table", format="iceberg", conn_id="iceberg_default"
+        )
+        handler = IcebergFormatHandler(datasource_config)
+        assert handler.datasource_config.options == {}
+        assert handler.datasource_config.conn_id == "iceberg_default"
+        assert handler.get_format == FormatType.ICEBERG
 
     def test_get_format_handler(self):
-        assert isinstance(get_format_handler("parquet"), ParquetFormatHandler)
-        assert isinstance(get_format_handler("csv"), CsvFormatHandler)
-        assert isinstance(get_format_handler("avro"), AvroFormatHandler)
+        assert isinstance(
+            get_format_handler(
+                DataSourceConfig(table_name="t", format="parquet", 
conn_id="c", uri="file://u")
+            ),
+            ParquetFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(DataSourceConfig(table_name="t", format="csv", 
conn_id="c", uri="file://u")),
+            CsvFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(DataSourceConfig(table_name="t", format="avro", 
conn_id="c", uri="file://u")),
+            AvroFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(DataSourceConfig(table_name="t", 
format="iceberg", conn_id="iceberg_default")),
+            IcebergFormatHandler,
+        )
 
-        with pytest.raises(ValueError, match="Unsupported format"):
-            get_format_handler("invalid")
+        with pytest.raises(ValueError, match="Unsupported storage type"):

Review Comment:
   updated good call :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to