This is an automated email from the ASF dual-hosted git repository.
jiadongb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 09f04c8f0b feat(amber): add RESTCatalog support for result storage
(#4272)
09f04c8f0b is described below
commit 09f04c8f0badc34ddde6dbe4ceb930faa3c3f2be
Author: Meng Wang <[email protected]>
AuthorDate: Wed Apr 1 16:12:35 2026 -0700
feat(amber): add RESTCatalog support for result storage (#4272)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR adds a RestCatalog related configuration for the result storage
on Apache Iceberg. After this PR, user can change the configuration to
switch the catalog to a restful service that is compatible with
RESTCatalog definition.
**Scala changes:**
- `IcebergUtil.scala`: added `createRestCatalog()` for REST catalog
connections with S3FileIO (MinIO), and namespace auto-creation for all
catalog types
- `IcebergCatalogInstance.scala`: updated singleton to support REST
catalog type selection
- `IcebergTableWriter.scala`: updated for REST catalog compatibility
- `StorageConfig.scala` / `EnvironmentalVariable.scala`: added REST
catalog configuration (URI, warehouse name, region, S3 bucket) and
environment variable support
- `storage.conf`: added REST catalog config section (default remains
`postgres` for backward compatibility)
- `build.sbt`: added `iceberg-aws`, AWS SDK dependencies, and Netty
version override for Arrow compatibility
- `PythonWorkflowWorker.scala` / `ComputingUnitManagingResource.scala`:
propagate REST catalog config to Python workers and computing units
**Python changes:**
- `iceberg_catalog_instance.py` / `iceberg_utils.py`: added REST catalog
support via PyIceberg
- `storage_config.py`: added REST catalog configuration parsing
- `texera_run_python_worker.py`: accept REST catalog config from Scala
side
- `requirements.txt`: upgraded PyIceberg (0.8.1 → 0.9.0), added
s3fs/aiobotocore for S3 access
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Closes #4340
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Manual
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
co-authored with Claude
---------
Signed-off-by: Meng Wang <[email protected]>
Co-authored-by: Jiadong Bai <[email protected]>
---
amber/requirements.txt | 3 ++
.../storage/iceberg/iceberg_catalog_instance.py | 36 +++++++++++++-----
.../python/core/storage/iceberg/iceberg_utils.py | 40 +++++++++++++++++++-
.../core/storage/iceberg/test_iceberg_document.py | 3 ++
.../src/main/python/core/storage/storage_config.py | 12 +++++-
.../pytexera/storage/test_large_binary_manager.py | 3 ++
amber/src/main/python/texera_run_python_worker.py | 6 +++
.../pythonworker/PythonWorkflowWorker.scala | 3 ++
common/config/src/main/resources/storage.conf | 14 +++++++
.../amber/config/EnvironmentalVariable.scala | 1 +
.../apache/texera/amber/config/StorageConfig.scala | 4 +-
common/workflow-core/build.sbt | 19 +++++++++-
.../core/storage/IcebergCatalogInstance.scala | 2 +-
.../result/iceberg/IcebergTableWriter.scala | 6 +--
.../org/apache/texera/amber/util/IcebergUtil.scala | 44 +++++++++++++++++-----
.../resource/ComputingUnitManagingResource.scala | 2 +
16 files changed, 171 insertions(+), 27 deletions(-)
diff --git a/amber/requirements.txt b/amber/requirements.txt
index e972830363..b545fd9654 100644
--- a/amber/requirements.txt
+++ b/amber/requirements.txt
@@ -43,6 +43,9 @@ bidict==0.22.0
cached_property==1.5.2
psutil==5.9.0
tzlocal==2.1
+s3fs==2025.9.0
+aiobotocore==2.25.1
+botocore==1.40.53
pyiceberg==0.11.1
readerwriterlock==1.0.9
tenacity==8.5.0
diff --git
a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
index b1478fadf0..0059808f9f 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py
@@ -18,14 +18,17 @@
from pyiceberg.catalog import Catalog
from typing import Optional
-from core.storage.iceberg.iceberg_utils import create_postgres_catalog
+from core.storage.iceberg.iceberg_utils import (
+ create_postgres_catalog,
+ create_rest_catalog,
+)
from core.storage.storage_config import StorageConfig
class IcebergCatalogInstance:
"""
IcebergCatalogInstance is a singleton that manages the Iceberg catalog
instance.
- Currently only postgres SQL catalog is supported.
+ Supports postgres SQL catalog and REST catalog.
- Provides a single shared catalog for all Iceberg table-related
operations.
- Lazily initializes the catalog on first access.
- Supports replacing the catalog instance for testing or reconfiguration.
@@ -39,16 +42,31 @@ class IcebergCatalogInstance:
Retrieves the singleton Iceberg catalog instance.
- If the catalog is not initialized, it is lazily created using the
configured
properties.
+ - Supports "postgres" and "rest" catalog types.
:return: the Iceberg catalog instance.
"""
if cls._instance is None:
- cls._instance = create_postgres_catalog(
- "texera_iceberg",
- StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
- StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
- StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
- StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
- )
+ catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE
+ if catalog_type == "postgres":
+ cls._instance = create_postgres_catalog(
+ "texera_iceberg",
+ StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
+ StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
+ StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
+ StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
+ )
+ elif catalog_type == "rest":
+ cls._instance = create_rest_catalog(
+ "texera_iceberg",
+ StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME,
+ StorageConfig.ICEBERG_REST_CATALOG_URI,
+ StorageConfig.S3_ENDPOINT,
+ StorageConfig.S3_REGION,
+ StorageConfig.S3_AUTH_USERNAME,
+ StorageConfig.S3_AUTH_PASSWORD,
+ )
+ else:
+ raise ValueError(f"Unsupported catalog type: {catalog_type}")
return cls._instance
@classmethod
diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
index 9e17b2e0e8..f973c72fe8 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
@@ -17,7 +17,7 @@
import pyarrow as pa
import pyiceberg.table
-from pyiceberg.catalog import Catalog
+from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan
@@ -153,6 +153,44 @@ def create_postgres_catalog(
)
+def create_rest_catalog(
+ catalog_name: str,
+ warehouse_name: str,
+ rest_uri: str,
+ s3_endpoint: str,
+ s3_region: str,
+ s3_username: str,
+ s3_password: str,
+) -> Catalog:
+ """
+ Creates a REST catalog instance by connecting to a REST endpoint.
+ - Configures the catalog to interact with a REST endpoint.
+ - The warehouse_name parameter specifies the warehouse identifier.
+ - Configures S3FileIO for MinIO/S3 storage backend.
+ :param catalog_name: the name of the catalog.
+ :param warehouse_name: the warehouse identifier.
+ :param rest_uri: the URI of the REST catalog endpoint.
+ :param s3_endpoint: the S3 endpoint URL.
+ :param s3_region: the S3 region.
+ :param s3_username: the S3 access key ID.
+ :param s3_password: the S3 secret access key.
+ :return: a Catalog instance (REST catalog).
+ """
+ return load_catalog(
+ catalog_name,
+ **{
+ "type": "rest",
+ "uri": rest_uri,
+ "warehouse": warehouse_name,
+ "s3.endpoint": s3_endpoint,
+ "s3.access-key-id": s3_username,
+ "s3.secret-access-key": s3_password,
+ "s3.region": s3_region,
+ "s3.path-style-access": "true",
+ },
+ )
+
+
def create_table(
catalog: Catalog,
table_namespace: str,
diff --git
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 34711beb65..9b374f7d5c 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -37,9 +37,12 @@ from proto.org.apache.texera.amber.core import (
# Hardcoded storage config only for test purposes.
StorageConfig.initialize(
+ catalog_type="postgres",
postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog",
postgres_username="texera",
postgres_password="password",
+ rest_catalog_uri="http://localhost:8181/catalog/",
+ rest_catalog_warehouse_name="texera",
table_result_namespace="operator-port-result",
directory_path="../../../../../../amber/user-resources/workflow-results",
commit_batch_size=4096,
diff --git a/amber/src/main/python/core/storage/storage_config.py
b/amber/src/main/python/core/storage/storage_config.py
index c55495ea14..0e47bdb71a 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -25,14 +25,17 @@ class StorageConfig:
_initialized = False
+ ICEBERG_CATALOG_TYPE = None
ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
ICEBERG_POSTGRES_CATALOG_USERNAME = None
ICEBERG_POSTGRES_CATALOG_PASSWORD = None
+ ICEBERG_REST_CATALOG_URI = None
+ ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
- # S3 configs (for large_binary_manager module)
+ # S3 configs
S3_ENDPOINT = None
S3_REGION = None
S3_AUTH_USERNAME = None
@@ -41,9 +44,12 @@ class StorageConfig:
@classmethod
def initialize(
cls,
+ catalog_type,
postgres_uri_without_scheme,
postgres_username,
postgres_password,
+ rest_catalog_uri,
+ rest_catalog_warehouse_name,
table_result_namespace,
directory_path,
commit_batch_size,
@@ -57,9 +63,13 @@ class StorageConfig:
"Storage config has already been initialized and cannot be
modified."
)
+ cls.ICEBERG_CATALOG_TYPE = catalog_type
cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME =
postgres_uri_without_scheme
cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
+ cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri
+ cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name
+
cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
index a657f244f3..64c7080e52 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
@@ -27,9 +27,12 @@ class TestLargeBinaryManager:
"""Initialize StorageConfig for tests."""
if not StorageConfig._initialized:
StorageConfig.initialize(
+ catalog_type="postgres",
postgres_uri_without_scheme="localhost:5432/test",
postgres_username="test",
postgres_password="test",
+ rest_catalog_uri="http://localhost:8181/catalog/",
+ rest_catalog_warehouse_name="texera",
table_result_namespace="test",
directory_path="/tmp/test",
commit_batch_size=1000,
diff --git a/amber/src/main/python/texera_run_python_worker.py
b/amber/src/main/python/texera_run_python_worker.py
index 3ebf81c201..8687298f81 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -45,9 +45,12 @@ if __name__ == "__main__":
output_port,
logger_level,
r_path,
+ iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
+ iceberg_rest_catalog_uri,
+ iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
@@ -58,9 +61,12 @@ if __name__ == "__main__":
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
+ iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
+ iceberg_rest_catalog_uri,
+ iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index 558b99c9b7..d2bc5f5025 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -178,9 +178,12 @@ class PythonWorkflowWorker(
Integer.toString(pythonProxyServer.getPortNumber.get()),
UdfConfig.pythonLogStreamHandlerLevel,
RENVPath,
+ StorageConfig.icebergCatalogType,
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
StorageConfig.icebergPostgresCatalogUsername,
StorageConfig.icebergPostgresCatalogPassword,
+ StorageConfig.icebergRESTCatalogUri,
+ StorageConfig.icebergRESTCatalogWarehouseName,
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
diff --git a/common/config/src/main/resources/storage.conf
b/common/config/src/main/resources/storage.conf
index 276d1491cd..cf2e23341b 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -27,6 +27,20 @@ storage {
rest-uri = ""
rest-uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI} # the uri of the
rest catalog, not needed unless using REST catalog
+ rest {
+ uri = "http://localhost:8181/catalog"
+ uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI}
+
+ warehouse-name = "texera"
+ warehouse-name =
${?STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME}
+
+ region = "us-west-2"
+ region = ${?STORAGE_ICEBERG_CATALOG_REST_REGION}
+
+ s3-bucket = "texera-iceberg"
+ s3-bucket = ${?STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}
+ }
+
postgres {
# do not include scheme in the uri as Python and Java use
different schemes
uri-without-scheme = "localhost:5432/texera_iceberg_catalog"
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 1adc323305..339b57f52a 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -52,6 +52,7 @@ object EnvironmentalVariable {
// Iceberg Catalog
val ENV_ICEBERG_CATALOG_TYPE = "STORAGE_ICEBERG_CATALOG_TYPE"
val ENV_ICEBERG_CATALOG_REST_URI = "STORAGE_ICEBERG_CATALOG_REST_URI"
+ val ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME =
"STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME"
// Iceberg Postgres Catalog
val ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME =
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 3bc1e05a9b..728e3c0c2d 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -36,7 +36,9 @@ object StorageConfig {
// Iceberg specifics
val icebergCatalogType: String =
conf.getString("storage.iceberg.catalog.type")
- val icebergRESTCatalogUri: String =
conf.getString("storage.iceberg.catalog.rest-uri")
+ val icebergRESTCatalogUri: String =
conf.getString("storage.iceberg.catalog.rest.uri")
+ val icebergRESTCatalogWarehouseName: String =
+ conf.getString("storage.iceberg.catalog.rest.warehouse-name")
// Iceberg Postgres specifics
val icebergPostgresCatalogUriWithoutScheme: String =
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index db91668513..4f9c37b171 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -134,10 +134,14 @@ dependencyOverrides ++= Seq(
"io.netty" % "netty-codec" % nettyVersion,
"io.netty" % "netty-codec-http" % nettyVersion,
"io.netty" % "netty-codec-http2" % nettyVersion,
+ "io.netty" % "netty-codec-socks" % nettyVersion,
"io.netty" % "netty-common" % nettyVersion,
"io.netty" % "netty-handler" % nettyVersion,
+ "io.netty" % "netty-handler-proxy" % nettyVersion,
"io.netty" % "netty-resolver" % nettyVersion,
"io.netty" % "netty-transport" % nettyVersion,
+ "io.netty" % "netty-transport-classes-epoll" % nettyVersion,
+ "io.netty" % "netty-transport-native-epoll" % nettyVersion,
"io.netty" % "netty-transport-native-unix-common" % nettyVersion
)
@@ -167,6 +171,10 @@ libraryDependencies ++= Seq(
excludeJackson,
excludeJacksonModule
),
+ "org.apache.iceberg" % "iceberg-aws" % "1.7.1" excludeAll(
+ excludeJackson,
+ excludeJacksonModule
+ ),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
@@ -208,6 +216,13 @@ libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.29.51" excludeAll(
ExclusionRule(organization = "io.netty")
),
- "software.amazon.awssdk" % "auth" % "2.29.51",
- "software.amazon.awssdk" % "regions" % "2.29.51",
+ "software.amazon.awssdk" % "auth" % "2.29.51" excludeAll(
+ ExclusionRule(organization = "io.netty")
+ ),
+ "software.amazon.awssdk" % "regions" % "2.29.51" excludeAll(
+ ExclusionRule(organization = "io.netty")
+ ),
+ "software.amazon.awssdk" % "sts" % "2.29.51" excludeAll(
+ ExclusionRule(organization = "io.netty")
+ ),
)
\ No newline at end of file
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
index e3512874c9..bb9f2d8bf2 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala
@@ -52,7 +52,7 @@ object IcebergCatalogInstance {
case "rest" =>
IcebergUtil.createRestCatalog(
"texera_iceberg",
- StorageConfig.fileStorageDirectoryPath
+ StorageConfig.icebergRESTCatalogWarehouseName
)
case "postgres" =>
IcebergUtil.createPostgresCatalog(
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
index 549cb4b9d1..06d04e407f 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala
@@ -29,7 +29,6 @@ import org.apache.iceberg.io.{DataWriter, OutputFile}
import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.{Schema, Table}
-import java.nio.file.Paths
import scala.collection.mutable.ArrayBuffer
/**
@@ -107,10 +106,11 @@ private[storage] class IcebergTableWriter[T](
private def flushBuffer(): Unit = {
if (buffer.nonEmpty) {
// Create a unique file path using the writer's identifier and the
filename index
- val filepath =
Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}")
+ val location = table.location().stripSuffix("/")
+ val filepathString = s"$location/${writerIdentifier}_$filenameIdx"
// Increment the filename index by 1
filenameIdx += 1
- val outputFile: OutputFile = table.io().newOutputFile(filepath.toString)
+ val outputFile: OutputFile = table.io().newOutputFile(filepathString)
// Create a Parquet data writer to write a new file
val dataWriter: DataWriter[Record] = Parquet
.writeData(outputFile)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
index ad6ac07c1f..d6e406a53f 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
@@ -22,9 +22,10 @@ package org.apache.texera.amber.util
import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType,
LargeBinary, Schema, Tuple}
import org.apache.hadoop.conf.Configuration
-import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.catalog.{Catalog, SupportsNamespaces,
TableIdentifier}
import org.apache.iceberg.data.parquet.GenericParquetReaders
import org.apache.iceberg.data.{GenericRecord, Record}
+import org.apache.iceberg.aws.s3.S3FileIO
import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO}
import org.apache.iceberg.io.{CloseableIterable, InputFile}
import org.apache.iceberg.jdbc.JdbcCatalog
@@ -40,6 +41,8 @@ import org.apache.iceberg.{
TableProperties,
Schema => IcebergSchema
}
+import org.apache.iceberg.catalog.Namespace
+import org.apache.iceberg.exceptions.AlreadyExistsException
import java.nio.ByteBuffer
import java.nio.file.Path
@@ -101,17 +104,26 @@ object IcebergUtil {
*/
def createRestCatalog(
catalogName: String,
- warehouse: Path
+ warehouse: String
): RESTCatalog = {
val catalog = new RESTCatalog()
- catalog.initialize(
- catalogName,
- Map(
- "warehouse" -> warehouse.toString,
- CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri,
- CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName
- ).asJava
+
+ // Build base properties map
+ var properties = Map(
+ "warehouse" -> warehouse,
+ CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri
)
+
+ properties = properties ++ Map(
+ CatalogProperties.FILE_IO_IMPL -> classOf[S3FileIO].getName,
+ "s3.endpoint" -> StorageConfig.s3Endpoint,
+ "s3.access-key-id" -> StorageConfig.s3Username,
+ "s3.secret-access-key" -> StorageConfig.s3Password,
+ "s3.region" -> StorageConfig.s3Region,
+ "s3.path-style-access" -> "true"
+ )
+
+ catalog.initialize(catalogName, properties.asJava)
catalog
}
@@ -165,6 +177,20 @@ object IcebergUtil {
TableProperties.COMMIT_MIN_RETRY_WAIT_MS ->
StorageConfig.icebergTableCommitMinRetryWaitMs.toString
)
+ val namespace = Namespace.of(tableNamespace)
+
+ catalog match {
+ case nsCatalog: SupportsNamespaces =>
+ try nsCatalog.createNamespace(namespace, Map.empty[String,
String].asJava)
+ catch {
+ case _: AlreadyExistsException => ()
+ }
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Catalog ${catalog.getClass.getName} does not support namespaces"
+ )
+ }
+
val identifier = TableIdentifier.of(tableNamespace, tableName)
if (catalog.tableExists(identifier) && overrideIfExists) {
catalog.dropTable(identifier)
diff --git
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
index 1249d06783..9b214b9755 100644
---
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
+++
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
@@ -69,6 +69,8 @@ object ComputingUnitManagingResource {
private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map(
// Variables for saving results to Iceberg
EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE ->
StorageConfig.icebergCatalogType,
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI ->
StorageConfig.icebergRESTCatalogUri,
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME ->
StorageConfig.icebergRESTCatalogWarehouseName,
EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME ->
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME ->
StorageConfig.icebergPostgresCatalogUsername,
EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD ->
StorageConfig.icebergPostgresCatalogPassword,