This is an automated email from the ASF dual-hosted git repository.
mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new bb78cca44c test: cover Iceberg REST catalog backend in CI (#4276)
bb78cca44c is described below
commit bb78cca44c963449247d12c7dc1d187da1a2ce62
Author: Meng Wang <[email protected]>
AuthorDate: Sun May 10 08:52:50 2026 -0700
test: cover Iceberg REST catalog backend in CI (#4276)
### What changes were proposed in this PR?
Add two `@IntegrationTest`-tagged specs that round-trip table metadata
via the Iceberg REST catalog against a live Lakekeeper + MinIO stack
brought up by the existing `amber-integration` CI job:
- `IcebergRestCatalogIntegrationSpec` (Scala)
- `test_iceberg_rest_catalog_integration.py` (Python, marked
`pytest.mark.integration`)
The `amber-integration` job now boots MinIO + Lakekeeper, initializes a
warehouse with an S3 storage profile, installs `dev-requirements.txt`
for pytest, and runs `pytest -m integration` after the existing sbt
step. The regular `python` job runs with `-m "not integration"` so the
new Python test is excluded there. The `integration` marker is
registered in `amber/pyproject.toml` as the Python equivalent of the
Scala `@IntegrationTest` Java tag.
### Any related issues, documentation, discussions?
Closes #4994 (sub-issue of #4126).
### How was this PR tested?
CI itself — the new specs run as part of `amber-integration` on every
push of this PR; Lakekeeper boot / warehouse init / REST-path
breakage all surface on that job's status check.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.github/workflows/build.yml | 97 +++++++++++++++++++++-
amber/pyproject.toml | 5 +-
.../IcebergRestCatalogIntegrationSpec.scala | 82 ++++++++++++++++++
.../test_iceberg_rest_catalog_integration.py | 70 ++++++++++++++++
4 files changed, 252 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 0a1ffbef18..9a78bc540f 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -315,11 +315,13 @@ jobs:
python -m pip install uv
if [ -f amber/requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
if [ -f amber/operator-requirements.txt ]; then uv pip install
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt;
fi
+ if [ -f amber/dev-requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
- name: Create Databases
run: |
psql -h localhost -U postgres -f sql/texera_ddl.sql
psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
psql -h localhost -U postgres -f sql/texera_lakefs.sql
+ psql -h localhost -U postgres -f sql/texera_lakekeeper.sql
env:
PGPASSWORD: postgres
- name: Setup sbt launcher
@@ -331,6 +333,96 @@ jobs:
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases
-f sql/texera_ddl.sql
env:
PGPASSWORD: postgres
+ - name: Start MinIO
+ run: |
+ docker run -d --name minio --network host \
+ -e MINIO_ROOT_USER=texera_minio \
+ -e MINIO_ROOT_PASSWORD=password \
+ minio/minio:RELEASE.2025-02-28T09-55-16Z server /data
+ for i in $(seq 1 3); do
+ curl -sf http://localhost:9000/minio/health/live && break
+ echo "Waiting for MinIO... (attempt $i)"
+ sleep 1
+ done
+ - name: Start Lakekeeper
+ env:
+ LAKEKEEPER__PG_DATABASE_URL_READ:
postgres://postgres:postgres@localhost:5432/texera_lakekeeper
+ LAKEKEEPER__PG_DATABASE_URL_WRITE:
postgres://postgres:postgres@localhost:5432/texera_lakekeeper
+ LAKEKEEPER__PG_ENCRYPTION_KEY: texera_key
+ run: |
+ docker run --rm --network host \
+ -e LAKEKEEPER__PG_DATABASE_URL_READ \
+ -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
+ -e LAKEKEEPER__PG_ENCRYPTION_KEY \
+ vakamo/lakekeeper:v0.11.0 migrate
+ docker run -d --name lakekeeper --network host \
+ -e LAKEKEEPER__PG_DATABASE_URL_READ \
+ -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
+ -e LAKEKEEPER__PG_ENCRYPTION_KEY \
+ -e LAKEKEEPER__METRICS_PORT=9091 \
+ vakamo/lakekeeper:v0.11.0 serve
+ for i in $(seq 1 3); do
+ docker exec lakekeeper /home/nonroot/lakekeeper healthcheck &&
break
+ echo "Waiting for Lakekeeper... (attempt $i)"
+ sleep 1
+ done
+ docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || {
+ echo "Lakekeeper failed to start. Container logs:"
+ docker logs lakekeeper
+ exit 1
+ }
+ - name: Initialize Lakekeeper warehouse
+ # Pull defaults out of storage.conf so this step doesn't duplicate
+ # values that already live in the runtime config. Each scalar in
+ # storage.conf is followed by a `${?VAR}` env-override line whose
+ # name is globally unique, so anchoring grep on that override line
+ # selects the value unambiguously across nested scopes.
+ run: |
+ CONF=common/config/src/main/resources/storage.conf
+ extract() {
+ grep -B1 -F "\${?$1}" "$CONF" | head -1 | sed -E
's/.*"([^"]+)".*/\1/'
+ }
+ WAREHOUSE_NAME=$(extract STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME)
+ S3_BUCKET=$(extract STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET)
+ S3_ENDPOINT=$(extract STORAGE_S3_ENDPOINT)
+ S3_REGION=$(extract STORAGE_S3_REGION)
+ S3_USERNAME=$(extract STORAGE_S3_AUTH_USERNAME)
+ S3_PASSWORD=$(extract STORAGE_S3_AUTH_PASSWORD)
+ # Lakekeeper's management API lives on the same host as the
+ # catalog; strip the /catalog suffix off the catalog URI to get
+ # the base URL.
+ REST_URI=$(extract STORAGE_ICEBERG_CATALOG_REST_URI)
+ LAKEKEEPER_BASE=${REST_URI%/catalog}
+ LAKEKEEPER_BASE=${LAKEKEEPER_BASE%/}
+
+ docker run --rm --network host --entrypoint sh minio/mc -c \
+ "mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \
+ mc mb --ignore-existing minio/$S3_BUCKET"
+ curl -sf -X POST -H 'Content-Type: application/json' \
+ -d
'{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}'
\
+ "$LAKEKEEPER_BASE/management/v1/project" || true
+ curl -sf -X POST -H 'Content-Type: application/json' -d @- \
+ "$LAKEKEEPER_BASE/management/v1/warehouse" <<EOF
+ {
+ "warehouse-name": "$WAREHOUSE_NAME",
+ "project-id": "00000000-0000-0000-0000-000000000000",
+ "storage-profile": {
+ "type": "s3",
+ "bucket": "$S3_BUCKET",
+ "region": "$S3_REGION",
+ "endpoint": "$S3_ENDPOINT",
+ "flavor": "s3-compat",
+ "path-style-access": true,
+ "sts-enabled": false
+ },
+ "storage-credential": {
+ "type": "s3",
+ "credential-type": "access-key",
+ "aws-access-key-id": "$S3_USERNAME",
+ "aws-secret-access-key": "$S3_PASSWORD"
+ }
+ }
+ EOF
- name: Lint and run amber integration tests
# AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
# keep only @org.apache.texera.amber.tags.IntegrationTest
@@ -351,6 +443,9 @@ jobs:
sbt scalafmtCheckAll \
"scalafixAll --check" \
"WorkflowExecutionService/test"
+ - name: Run Python integration tests
+ run: |
+ cd amber && pytest -m integration -sv
platform:
# Per-service build, test, and license check for the non-amber Scala
@@ -524,7 +619,7 @@ jobs:
if [ -f amber/dev-requirements.txt ]; then uv pip install --system
-r amber/dev-requirements.txt; fi
- name: Test with pytest
run: |
- cd amber && pytest --cov=src/main/python --cov-report=xml -sv
+ cd amber && pytest -m "not integration" --cov=src/main/python
--cov-report=xml -sv
- name: Upload python coverage to Codecov
if: matrix.python-version == '3.12' && always()
uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe
# v5.5.4
diff --git a/amber/pyproject.toml b/amber/pyproject.toml
index 46f8c55db6..bb4659282b 100644
--- a/amber/pyproject.toml
+++ b/amber/pyproject.toml
@@ -39,4 +39,7 @@ testpaths = ["src/test/python"]
# to mirror `src/main/python`'s __init__.py layout to avoid duplicate
# package names. Required for src-style test layouts per the pytest
# docs (https://docs.pytest.org/en/stable/explanation/goodpractices.html).
-addopts = "--import-mode=importlib"
\ No newline at end of file
+addopts = "--import-mode=importlib"
+markers = [
+ "integration: end-to-end test routed to the amber-integration CI job",
+]
\ No newline at end of file
diff --git
a/amber/src/test/integration/org/apache/texera/amber/storage/iceberg/IcebergRestCatalogIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/storage/iceberg/IcebergRestCatalogIntegrationSpec.scala
new file mode 100644
index 0000000000..9f2133b608
--- /dev/null
+++
b/amber/src/test/integration/org/apache/texera/amber/storage/iceberg/IcebergRestCatalogIntegrationSpec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.storage.iceberg
+
+import org.apache.iceberg.catalog.TableIdentifier
+import org.apache.iceberg.exceptions.NoSuchTableException
+import org.apache.iceberg.rest.RESTCatalog
+import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.tags.IntegrationTest
+import org.apache.texera.amber.util.IcebergUtil
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.util.UUID
+
+/** Round-trip table metadata via the REST catalog. */
+@IntegrationTest
+class IcebergRestCatalogIntegrationSpec extends AnyFlatSpec with
BeforeAndAfterAll {
+
+ private var restCatalog: RESTCatalog = _
+
+ private val testNamespace = "rest_integration_test"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ restCatalog = IcebergUtil.createRestCatalog(
+ "rest_integration_test",
+ StorageConfig.icebergRESTCatalogWarehouseName
+ )
+ }
+
+ behavior of "Iceberg REST catalog"
+
+ it should "round-trip table metadata via the REST catalog" in {
+ val amberSchema = Schema(
+ List(
+ new Attribute("id", AttributeType.INTEGER),
+ new Attribute("name", AttributeType.STRING)
+ )
+ )
+ val icebergSchema = IcebergUtil.toIcebergSchema(amberSchema)
+
+ val tableName = s"rest_table_${UUID.randomUUID().toString.replace("-",
"")}"
+ val identifier = TableIdentifier.of(testNamespace, tableName)
+
+ IcebergUtil.createTable(
+ restCatalog,
+ testNamespace,
+ tableName,
+ icebergSchema,
+ overrideIfExists = true
+ )
+ assert(restCatalog.tableExists(identifier))
+
+ val loaded = restCatalog.loadTable(identifier)
+ assert(loaded.schema().sameSchema(icebergSchema))
+
+ restCatalog.dropTable(identifier, false)
+ assert(!restCatalog.tableExists(identifier))
+ intercept[NoSuchTableException] {
+ restCatalog.loadTable(identifier)
+ }
+ }
+}
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_rest_catalog_integration.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_rest_catalog_integration.py
new file mode 100644
index 0000000000..642fbb08e5
--- /dev/null
+++
b/amber/src/test/python/core/storage/iceberg/test_iceberg_rest_catalog_integration.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import pytest
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType, NestedField, StringType
+
+from core.storage.iceberg.iceberg_utils import create_rest_catalog
+
+pytestmark = pytest.mark.integration
+
+
[email protected]
+def rest_catalog():
+ return create_rest_catalog(
+ catalog_name="rest_integration_test",
+ warehouse_name="texera",
+ rest_uri="http://localhost:8181/catalog/",
+ s3_endpoint="http://localhost:9000",
+ s3_region="us-west-2",
+ s3_username="texera_minio",
+ s3_password="password",
+ )
+
+
+def test_rest_catalog_round_trip(rest_catalog):
+ """Round-trip table metadata via the REST catalog (Lakekeeper)."""
+ namespace = "rest_integration_test_ns"
+ table_name = f"rest_test_{uuid.uuid4().hex}"
+ identifier = f"{namespace}.{table_name}"
+
+ schema = Schema(
+ NestedField(field_id=1, name="id", field_type=IntegerType(),
required=False),
+ NestedField(field_id=2, name="name", field_type=StringType(),
required=False),
+ )
+
+ rest_catalog.create_namespace_if_not_exists(namespace)
+ if rest_catalog.table_exists(identifier):
+ rest_catalog.drop_table(identifier)
+
+ # create — exercises REST createTable.
+ rest_catalog.create_table(identifier=identifier, schema=schema)
+ assert rest_catalog.table_exists(identifier)
+
+ # load — exercises REST loadTable (metadata fetch).
+ loaded = rest_catalog.load_table(identifier)
+ assert len(loaded.schema().fields) == 2
+
+ # drop — exercises REST dropTable.
+ rest_catalog.drop_table(identifier)
+ assert not rest_catalog.table_exists(identifier)
+ with pytest.raises(NoSuchTableError):
+ rest_catalog.load_table(identifier)