This is an automated email from the ASF dual-hosted git repository.

jiadongb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 09f04c8f0b feat(amber): add RESTCatalog support for result storage 
(#4272)
09f04c8f0b is described below

commit 09f04c8f0badc34ddde6dbe4ceb930faa3c3f2be
Author: Meng Wang <[email protected]>
AuthorDate: Wed Apr 1 16:12:35 2026 -0700

    feat(amber): add RESTCatalog support for result storage (#4272)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    
    This PR adds a RestCatalog related configuration for the result storage
    on Apache Iceberg. After this PR, user can change the configuration to
    switch the catalog to a restful service that is compatible with
    RESTCatalog definition.
    
    
    **Scala changes:**
    - `IcebergUtil.scala`: added `createRestCatalog()` for REST catalog
    connections with S3FileIO (MinIO), and namespace auto-creation for all
    catalog types
    - `IcebergCatalogInstance.scala`: updated singleton to support REST
    catalog type selection
      - `IcebergTableWriter.scala`: updated for REST catalog compatibility
    - `StorageConfig.scala` / `EnvironmentalVariable.scala`: added REST
    catalog configuration (URI, warehouse name, region, S3 bucket) and
    environment variable support
    - `storage.conf`: added REST catalog config section (default remains
    `postgres` for backward compatibility)
    - `build.sbt`: added `iceberg-aws`, AWS SDK dependencies, and Netty
    version override for Arrow compatibility
    - `PythonWorkflowWorker.scala` / `ComputingUnitManagingResource.scala`:
    propagate REST catalog config to Python workers and computing units
    
      **Python changes:**
    - `iceberg_catalog_instance.py` / `iceberg_utils.py`: added REST catalog
    support via PyIceberg
      - `storage_config.py`: added REST catalog configuration parsing
    - `texera_run_python_worker.py`: accept REST catalog config from Scala
    side
    - `requirements.txt`: upgraded PyIceberg (0.8.1 → 0.9.0), added
    s3fs/aiobotocore for S3 access
    
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    
    Closes #4340
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    
    Manual
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    
    co-authored with Claude
    
    ---------
    
    Signed-off-by: Meng Wang <[email protected]>
    Co-authored-by: Jiadong Bai <[email protected]>
---
 amber/requirements.txt                             |  3 ++
 .../storage/iceberg/iceberg_catalog_instance.py    | 36 +++++++++++++-----
 .../python/core/storage/iceberg/iceberg_utils.py   | 40 +++++++++++++++++++-
 .../core/storage/iceberg/test_iceberg_document.py  |  3 ++
 .../src/main/python/core/storage/storage_config.py | 12 +++++-
 .../pytexera/storage/test_large_binary_manager.py  |  3 ++
 amber/src/main/python/texera_run_python_worker.py  |  6 +++
 .../pythonworker/PythonWorkflowWorker.scala        |  3 ++
 common/config/src/main/resources/storage.conf      | 14 +++++++
 .../amber/config/EnvironmentalVariable.scala       |  1 +
 .../apache/texera/amber/config/StorageConfig.scala |  4 +-
 common/workflow-core/build.sbt                     | 19 +++++++++-
 .../core/storage/IcebergCatalogInstance.scala      |  2 +-
 .../result/iceberg/IcebergTableWriter.scala        |  6 +--
 .../org/apache/texera/amber/util/IcebergUtil.scala | 44 +++++++++++++++++-----
 .../resource/ComputingUnitManagingResource.scala   |  2 +
 16 files changed, 171 insertions(+), 27 deletions(-)

diff --git a/amber/requirements.txt b/amber/requirements.txt
index e972830363..b545fd9654 100644
--- a/amber/requirements.txt
+++ b/amber/requirements.txt
@@ -43,6 +43,9 @@ bidict==0.22.0
 cached_property==1.5.2
 psutil==5.9.0
 tzlocal==2.1
+s3fs==2025.9.0
+aiobotocore==2.25.1
+botocore==1.40.53
 pyiceberg==0.11.1
 readerwriterlock==1.0.9
 tenacity==8.5.0
diff --git 
a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py 
b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
index b1478fadf0..0059808f9f 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
@@ -18,14 +18,17 @@
 from pyiceberg.catalog import Catalog
 from typing import Optional
 
-from core.storage.iceberg.iceberg_utils import create_postgres_catalog
+from core.storage.iceberg.iceberg_utils import (
+    create_postgres_catalog,
+    create_rest_catalog,
+)
 from core.storage.storage_config import StorageConfig
 
 
 class IcebergCatalogInstance:
     """
     IcebergCatalogInstance is a singleton that manages the Iceberg catalog 
instance.
-    Currently only postgres SQL catalog is supported.
+    Supports postgres SQL catalog and REST catalog.
     - Provides a single shared catalog for all Iceberg table-related 
operations.
     - Lazily initializes the catalog on first access.
     - Supports replacing the catalog instance for testing or reconfiguration.
@@ -39,16 +42,31 @@ class IcebergCatalogInstance:
         Retrieves the singleton Iceberg catalog instance.
         - If the catalog is not initialized, it is lazily created using the 
configured
         properties.
+        - Supports "postgres" and "rest" catalog types.
         :return: the Iceberg catalog instance.
         """
         if cls._instance is None:
-            cls._instance = create_postgres_catalog(
-                "texera_iceberg",
-                StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
-                StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
-                StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
-                StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
-            )
+            catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE
+            if catalog_type == "postgres":
+                cls._instance = create_postgres_catalog(
+                    "texera_iceberg",
+                    StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
+                    StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
+                    StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
+                    StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
+                )
+            elif catalog_type == "rest":
+                cls._instance = create_rest_catalog(
+                    "texera_iceberg",
+                    StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME,
+                    StorageConfig.ICEBERG_REST_CATALOG_URI,
+                    StorageConfig.S3_ENDPOINT,
+                    StorageConfig.S3_REGION,
+                    StorageConfig.S3_AUTH_USERNAME,
+                    StorageConfig.S3_AUTH_PASSWORD,
+                )
+            else:
+                raise ValueError(f"Unsupported catalog type: {catalog_type}")
         return cls._instance
 
     @classmethod
diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py 
b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
index 9e17b2e0e8..f973c72fe8 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
@@ -17,7 +17,7 @@
 
 import pyarrow as pa
 import pyiceberg.table
-from pyiceberg.catalog import Catalog
+from pyiceberg.catalog import Catalog, load_catalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.expressions import AlwaysTrue
 from pyiceberg.io.pyarrow import ArrowScan
@@ -153,6 +153,44 @@ def create_postgres_catalog(
     )
 
 
+def create_rest_catalog(
+    catalog_name: str,
+    warehouse_name: str,
+    rest_uri: str,
+    s3_endpoint: str,
+    s3_region: str,
+    s3_username: str,
+    s3_password: str,
+) -> Catalog:
+    """
+    Creates a REST catalog instance by connecting to a REST endpoint.
+    - Configures the catalog to interact with a REST endpoint.
+    - The warehouse_name parameter specifies the warehouse identifier.
+    - Configures S3FileIO for MinIO/S3 storage backend.
+    :param catalog_name: the name of the catalog.
+    :param warehouse_name: the warehouse identifier.
+    :param rest_uri: the URI of the REST catalog endpoint.
+    :param s3_endpoint: the S3 endpoint URL.
+    :param s3_region: the S3 region.
+    :param s3_username: the S3 access key ID.
+    :param s3_password: the S3 secret access key.
+    :return: a Catalog instance (REST catalog).
+    """
+    return load_catalog(
+        catalog_name,
+        **{
+            "type": "rest",
+            "uri": rest_uri,
+            "warehouse": warehouse_name,
+            "s3.endpoint": s3_endpoint,
+            "s3.access-key-id": s3_username,
+            "s3.secret-access-key": s3_password,
+            "s3.region": s3_region,
+            "s3.path-style-access": "true",
+        },
+    )
+
+
 def create_table(
     catalog: Catalog,
     table_namespace: str,
diff --git 
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 34711beb65..9b374f7d5c 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -37,9 +37,12 @@ from proto.org.apache.texera.amber.core import (
 
 # Hardcoded storage config only for test purposes.
 StorageConfig.initialize(
+    catalog_type="postgres",
     postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog",
     postgres_username="texera",
     postgres_password="password",
+    rest_catalog_uri="http://localhost:8181/catalog/";,
+    rest_catalog_warehouse_name="texera",
     table_result_namespace="operator-port-result",
     directory_path="../../../../../../amber/user-resources/workflow-results",
     commit_batch_size=4096,
diff --git a/amber/src/main/python/core/storage/storage_config.py 
b/amber/src/main/python/core/storage/storage_config.py
index c55495ea14..0e47bdb71a 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -25,14 +25,17 @@ class StorageConfig:
 
     _initialized = False
 
+    ICEBERG_CATALOG_TYPE = None
     ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
     ICEBERG_POSTGRES_CATALOG_USERNAME = None
     ICEBERG_POSTGRES_CATALOG_PASSWORD = None
+    ICEBERG_REST_CATALOG_URI = None
+    ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
     ICEBERG_TABLE_RESULT_NAMESPACE = None
     ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
     ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
 
-    # S3 configs (for large_binary_manager module)
+    # S3 configs
     S3_ENDPOINT = None
     S3_REGION = None
     S3_AUTH_USERNAME = None
@@ -41,9 +44,12 @@ class StorageConfig:
     @classmethod
     def initialize(
         cls,
+        catalog_type,
         postgres_uri_without_scheme,
         postgres_username,
         postgres_password,
+        rest_catalog_uri,
+        rest_catalog_warehouse_name,
         table_result_namespace,
         directory_path,
         commit_batch_size,
@@ -57,9 +63,13 @@ class StorageConfig:
                 "Storage config has already been initialized and cannot be 
modified."
             )
 
+        cls.ICEBERG_CATALOG_TYPE = catalog_type
         cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = 
postgres_uri_without_scheme
         cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
         cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
+        cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri
+        cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name
+
         cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
         cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
         cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
diff --git 
a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py 
b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
index a657f244f3..64c7080e52 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
@@ -27,9 +27,12 @@ class TestLargeBinaryManager:
         """Initialize StorageConfig for tests."""
         if not StorageConfig._initialized:
             StorageConfig.initialize(
+                catalog_type="postgres",
                 postgres_uri_without_scheme="localhost:5432/test",
                 postgres_username="test",
                 postgres_password="test",
+                rest_catalog_uri="http://localhost:8181/catalog/";,
+                rest_catalog_warehouse_name="texera",
                 table_result_namespace="test",
                 directory_path="/tmp/test",
                 commit_batch_size=1000,
diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index 3ebf81c201..8687298f81 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -45,9 +45,12 @@ if __name__ == "__main__":
         output_port,
         logger_level,
         r_path,
+        iceberg_catalog_type,
         iceberg_postgres_catalog_uri_without_scheme,
         iceberg_postgres_catalog_username,
         iceberg_postgres_catalog_password,
+        iceberg_rest_catalog_uri,
+        iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
@@ -58,9 +61,12 @@ if __name__ == "__main__":
     ) = sys.argv
     init_loguru_logger(logger_level)
     StorageConfig.initialize(
+        iceberg_catalog_type,
         iceberg_postgres_catalog_uri_without_scheme,
         iceberg_postgres_catalog_username,
         iceberg_postgres_catalog_password,
+        iceberg_rest_catalog_uri,
+        iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index 558b99c9b7..d2bc5f5025 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -178,9 +178,12 @@ class PythonWorkflowWorker(
         Integer.toString(pythonProxyServer.getPortNumber.get()),
         UdfConfig.pythonLogStreamHandlerLevel,
         RENVPath,
+        StorageConfig.icebergCatalogType,
         StorageConfig.icebergPostgresCatalogUriWithoutScheme,
         StorageConfig.icebergPostgresCatalogUsername,
         StorageConfig.icebergPostgresCatalogPassword,
+        StorageConfig.icebergRESTCatalogUri,
+        StorageConfig.icebergRESTCatalogWarehouseName,
         StorageConfig.icebergTableResultNamespace,
         StorageConfig.fileStorageDirectoryPath.toString,
         StorageConfig.icebergTableCommitBatchSize.toString,
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 276d1491cd..cf2e23341b 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -27,6 +27,20 @@ storage {
             rest-uri = ""
             rest-uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI} # the uri of the 
rest catalog, not needed unless using REST catalog
 
+            rest {
+                uri = "http://localhost:8181/catalog";
+                uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI}
+
+                warehouse-name = "texera"
+                warehouse-name = 
${?STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME}
+
+                region = "us-west-2"
+                region = ${?STORAGE_ICEBERG_CATALOG_REST_REGION}
+
+                s3-bucket = "texera-iceberg"
+                s3-bucket = ${?STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}
+            }
+
             postgres {
                 # do not include scheme in the uri as Python and Java use 
different schemes
                 uri-without-scheme = "localhost:5432/texera_iceberg_catalog"
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 1adc323305..339b57f52a 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -52,6 +52,7 @@ object EnvironmentalVariable {
   // Iceberg Catalog
   val ENV_ICEBERG_CATALOG_TYPE = "STORAGE_ICEBERG_CATALOG_TYPE"
   val ENV_ICEBERG_CATALOG_REST_URI = "STORAGE_ICEBERG_CATALOG_REST_URI"
+  val ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME = 
"STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME"
 
   // Iceberg Postgres Catalog
   val ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME =
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 3bc1e05a9b..728e3c0c2d 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -36,7 +36,9 @@ object StorageConfig {
 
   // Iceberg specifics
   val icebergCatalogType: String = 
conf.getString("storage.iceberg.catalog.type")
-  val icebergRESTCatalogUri: String = 
conf.getString("storage.iceberg.catalog.rest-uri")
+  val icebergRESTCatalogUri: String = 
conf.getString("storage.iceberg.catalog.rest.uri")
+  val icebergRESTCatalogWarehouseName: String =
+    conf.getString("storage.iceberg.catalog.rest.warehouse-name")
 
   // Iceberg Postgres specifics
   val icebergPostgresCatalogUriWithoutScheme: String =
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index db91668513..4f9c37b171 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -134,10 +134,14 @@ dependencyOverrides ++= Seq(
   "io.netty" % "netty-codec" % nettyVersion,
   "io.netty" % "netty-codec-http" % nettyVersion,
   "io.netty" % "netty-codec-http2" % nettyVersion,
+  "io.netty" % "netty-codec-socks" % nettyVersion,
   "io.netty" % "netty-common" % nettyVersion,
   "io.netty" % "netty-handler" % nettyVersion,
+  "io.netty" % "netty-handler-proxy" % nettyVersion,
   "io.netty" % "netty-resolver" % nettyVersion,
   "io.netty" % "netty-transport" % nettyVersion,
+  "io.netty" % "netty-transport-classes-epoll" % nettyVersion,
+  "io.netty" % "netty-transport-native-epoll" % nettyVersion,
   "io.netty" % "netty-transport-native-unix-common" % nettyVersion
 )
 
@@ -167,6 +171,10 @@ libraryDependencies ++= Seq(
     excludeJackson,
     excludeJacksonModule
   ),
+  "org.apache.iceberg" % "iceberg-aws" % "1.7.1" excludeAll(
+    excludeJackson,
+    excludeJacksonModule
+  ),
   "org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
     excludeXmlBind,
     excludeGlassfishJersey,
@@ -208,6 +216,13 @@ libraryDependencies ++= Seq(
   "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll(
     ExclusionRule(organization = "io.netty")
   ),
-  "software.amazon.awssdk" % "auth" % "2.29.51",
-  "software.amazon.awssdk" % "regions" % "2.29.51",
+  "software.amazon.awssdk" % "auth" % "2.29.51" excludeAll(
+    ExclusionRule(organization = "io.netty")
+  ),
+  "software.amazon.awssdk" % "regions" % "2.29.51" excludeAll(
+    ExclusionRule(organization = "io.netty")
+  ),
+  "software.amazon.awssdk" % "sts" % "2.29.51" excludeAll(
+    ExclusionRule(organization = "io.netty")
+  ),
 )
\ No newline at end of file
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
index e3512874c9..bb9f2d8bf2 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
@@ -52,7 +52,7 @@ object IcebergCatalogInstance {
           case "rest" =>
             IcebergUtil.createRestCatalog(
               "texera_iceberg",
-              StorageConfig.fileStorageDirectoryPath
+              StorageConfig.icebergRESTCatalogWarehouseName
             )
           case "postgres" =>
             IcebergUtil.createPostgresCatalog(
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
index 549cb4b9d1..06d04e407f 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
@@ -29,7 +29,6 @@ import org.apache.iceberg.io.{DataWriter, OutputFile}
 import org.apache.iceberg.parquet.Parquet
 import org.apache.iceberg.{Schema, Table}
 
-import java.nio.file.Paths
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -107,10 +106,11 @@ private[storage] class IcebergTableWriter[T](
   private def flushBuffer(): Unit = {
     if (buffer.nonEmpty) {
       // Create a unique file path using the writer's identifier and the 
filename index
-      val filepath = 
Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}")
+      val location = table.location().stripSuffix("/")
+      val filepathString = s"$location/${writerIdentifier}_$filenameIdx"
       // Increment the filename index by 1
       filenameIdx += 1
-      val outputFile: OutputFile = table.io().newOutputFile(filepath.toString)
+      val outputFile: OutputFile = table.io().newOutputFile(filepathString)
       // Create a Parquet data writer to write a new file
       val dataWriter: DataWriter[Record] = Parquet
         .writeData(outputFile)
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
index ad6ac07c1f..d6e406a53f 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
@@ -22,9 +22,10 @@ package org.apache.texera.amber.util
 import org.apache.texera.amber.config.StorageConfig
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, 
LargeBinary, Schema, Tuple}
 import org.apache.hadoop.conf.Configuration
-import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.catalog.{Catalog, SupportsNamespaces, 
TableIdentifier}
 import org.apache.iceberg.data.parquet.GenericParquetReaders
 import org.apache.iceberg.data.{GenericRecord, Record}
+import org.apache.iceberg.aws.s3.S3FileIO
 import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO}
 import org.apache.iceberg.io.{CloseableIterable, InputFile}
 import org.apache.iceberg.jdbc.JdbcCatalog
@@ -40,6 +41,8 @@ import org.apache.iceberg.{
   TableProperties,
   Schema => IcebergSchema
 }
+import org.apache.iceberg.catalog.Namespace
+import org.apache.iceberg.exceptions.AlreadyExistsException
 
 import java.nio.ByteBuffer
 import java.nio.file.Path
@@ -101,17 +104,26 @@ object IcebergUtil {
     */
   def createRestCatalog(
       catalogName: String,
-      warehouse: Path
+      warehouse: String
   ): RESTCatalog = {
     val catalog = new RESTCatalog()
-    catalog.initialize(
-      catalogName,
-      Map(
-        "warehouse" -> warehouse.toString,
-        CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri,
-        CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName
-      ).asJava
+
+    // Build base properties map
+    var properties = Map(
+      "warehouse" -> warehouse,
+      CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri
     )
+
+    properties = properties ++ Map(
+      CatalogProperties.FILE_IO_IMPL -> classOf[S3FileIO].getName,
+      "s3.endpoint" -> StorageConfig.s3Endpoint,
+      "s3.access-key-id" -> StorageConfig.s3Username,
+      "s3.secret-access-key" -> StorageConfig.s3Password,
+      "s3.region" -> StorageConfig.s3Region,
+      "s3.path-style-access" -> "true"
+    )
+
+    catalog.initialize(catalogName, properties.asJava)
     catalog
   }
 
@@ -165,6 +177,20 @@ object IcebergUtil {
       TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> 
StorageConfig.icebergTableCommitMinRetryWaitMs.toString
     )
 
+    val namespace = Namespace.of(tableNamespace)
+
+    catalog match {
+      case nsCatalog: SupportsNamespaces =>
+        try nsCatalog.createNamespace(namespace, Map.empty[String, 
String].asJava)
+        catch {
+          case _: AlreadyExistsException => ()
+        }
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Catalog ${catalog.getClass.getName} does not support namespaces"
+        )
+    }
+
     val identifier = TableIdentifier.of(tableNamespace, tableName)
     if (catalog.tableExists(identifier) && overrideIfExists) {
       catalog.dropTable(identifier)
diff --git 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
index 1249d06783..9b214b9755 100644
--- 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
+++ 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
@@ -69,6 +69,8 @@ object ComputingUnitManagingResource {
   private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map(
     // Variables for saving results to Iceberg
     EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> 
StorageConfig.icebergCatalogType,
+    EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI -> 
StorageConfig.icebergRESTCatalogUri,
+    EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME -> 
StorageConfig.icebergRESTCatalogWarehouseName,
     EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME -> 
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
     EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME -> 
StorageConfig.icebergPostgresCatalogUsername,
     EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD -> 
StorageConfig.icebergPostgresCatalogPassword,

Reply via email to