This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3e45527beb Core: Add storage credentials to PlanTableScanResponse
(#14518)
3e45527beb is described below
commit 3e45527bebca8a316e2722e0b2a55b6b41f6b7ee
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sun Nov 23 03:57:46 2025 +0100
Core: Add storage credentials to PlanTableScanResponse (#14518)
---
.../rest/responses/PlanTableScanResponse.java | 26 ++-
.../responses/PlanTableScanResponseParser.java | 33 ++-
.../responses/TestPlanTableScanResponseParser.java | 229 +++++++++++++++++++++
3 files changed, 278 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java
b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java
index 95f862de96..1b4bb86e65 100644
---
a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java
+++
b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java
@@ -25,11 +25,15 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.credentials.Credential;
public class PlanTableScanResponse extends BaseScanTaskResponse {
private final PlanStatus planStatus;
private final String planId;
+ private final List<Credential> credentials;
private PlanTableScanResponse(
PlanStatus planStatus,
@@ -37,10 +41,12 @@ public class PlanTableScanResponse extends
BaseScanTaskResponse {
List<String> planTasks,
List<FileScanTask> fileScanTasks,
List<DeleteFile> deleteFiles,
- Map<Integer, PartitionSpec> specsById) {
+ Map<Integer, PartitionSpec> specsById,
+ List<Credential> credentials) {
super(planTasks, fileScanTasks, deleteFiles, specsById);
this.planStatus = planStatus;
this.planId = planId;
+ this.credentials = credentials;
validate();
}
@@ -52,6 +58,10 @@ public class PlanTableScanResponse extends
BaseScanTaskResponse {
return planId;
}
+ public List<Credential> credentials() {
+ return credentials != null ? credentials : ImmutableList.of();
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -98,6 +108,7 @@ public class PlanTableScanResponse extends
BaseScanTaskResponse {
public static class Builder extends BaseScanTaskResponse.Builder<Builder,
PlanTableScanResponse> {
private PlanStatus planStatus;
private String planId;
+ private final List<Credential> credentials = Lists.newArrayList();
/**
* @deprecated since 1.11.0, visibility will be reduced in 1.12.0; use
{@link
@@ -116,10 +127,21 @@ public class PlanTableScanResponse extends
BaseScanTaskResponse {
return this;
}
+ public Builder withCredentials(List<Credential> credentialsToAdd) {
+ credentials.addAll(credentialsToAdd);
+ return this;
+ }
+
@Override
public PlanTableScanResponse build() {
return new PlanTableScanResponse(
- planStatus, planId, planTasks(), fileScanTasks(), deleteFiles(),
specsById());
+ planStatus,
+ planId,
+ planTasks(),
+ fileScanTasks(),
+ deleteFiles(),
+ specsById(),
+ credentials);
}
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java
b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java
index 049d596fcb..df2ad94a32 100644
---
a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java
+++
b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java
@@ -30,12 +30,15 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.TableScanResponseParser;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.CredentialParser;
import org.apache.iceberg.util.JsonUtil;
public class PlanTableScanResponseParser {
private static final String STATUS = "status";
private static final String PLAN_ID = "plan-id";
private static final String PLAN_TASKS = "plan-tasks";
+ private static final String STORAGE_CREDENTIALS = "storage-credentials";
private PlanTableScanResponseParser() {}
@@ -64,6 +67,15 @@ public class PlanTableScanResponseParser {
JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen);
}
+ if (!response.credentials().isEmpty()) {
+ gen.writeArrayFieldStart(STORAGE_CREDENTIALS);
+ for (Credential credential : response.credentials()) {
+ CredentialParser.toJson(credential, gen);
+ }
+
+ gen.writeEndArray();
+ }
+
TableScanResponseParser.serializeScanTasks(
response.fileScanTasks(), response.deleteFiles(),
response.specsById(), gen);
@@ -92,13 +104,18 @@ public class PlanTableScanResponseParser {
List<FileScanTask> fileScanTasks =
TableScanResponseParser.parseFileScanTasks(json, deleteFiles,
specsById, caseSensitive);
- return PlanTableScanResponse.builder()
- .withPlanId(planId)
- .withPlanStatus(planStatus)
- .withPlanTasks(planTasks)
- .withFileScanTasks(fileScanTasks)
- .withDeleteFiles(deleteFiles)
- .withSpecsById(specsById)
- .build();
+ PlanTableScanResponse.Builder builder =
+ PlanTableScanResponse.builder()
+ .withPlanId(planId)
+ .withPlanStatus(planStatus)
+ .withPlanTasks(planTasks)
+ .withFileScanTasks(fileScanTasks)
+ .withDeleteFiles(deleteFiles)
+ .withSpecsById(specsById);
+
+ if (json.hasNonNull(STORAGE_CREDENTIALS)) {
+
builder.withCredentials(LoadCredentialsResponseParser.fromJson(json).credentials());
+ }
+ return builder.build();
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java
b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java
index 48f8a5dcb6..c2867ca3a2 100644
---
a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java
+++
b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java
@@ -38,7 +38,11 @@ import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.junit.jupiter.api.Test;
public class TestPlanTableScanResponseParser {
@@ -440,4 +444,229 @@ public class TestPlanTableScanResponseParser {
assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedJson);
}
+
+ @Test
+ public void emptyOrInvalidCredentials() {
+ assertThat(
+ PlanTableScanResponseParser.fromJson(
+ "{\"status\": \"completed\",\"storage-credentials\":
null}",
+ PARTITION_SPECS_BY_ID,
+ false)
+ .credentials())
+ .isEmpty();
+
+ assertThat(
+ PlanTableScanResponseParser.fromJson(
+ "{\"status\": \"completed\",\"storage-credentials\": []}",
+ PARTITION_SPECS_BY_ID,
+ false)
+ .credentials())
+ .isEmpty();
+
+ assertThatThrownBy(
+ () ->
+ PlanTableScanResponseParser.fromJson(
+ "{\"status\": \"completed\",\"storage-credentials\":
\"invalid\"}",
+ PARTITION_SPECS_BY_ID,
+ false))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse credentials from non-array: \"invalid\"");
+ }
+
+ @Test
+ public void roundTripSerdeWithCredentials() {
+ List<Credential> credentials =
+ ImmutableList.of(
+ ImmutableCredential.builder()
+ .prefix("s3://custom-uri")
+ .config(
+ ImmutableMap.of(
+ "s3.access-key-id",
+ "keyId",
+ "s3.secret-access-key",
+ "accessKey",
+ "s3.session-token",
+ "sessionToken"))
+ .build(),
+ ImmutableCredential.builder()
+ .prefix("gs://custom-uri")
+ .config(
+ ImmutableMap.of(
+ "gcs.oauth2.token", "gcsToken1",
"gcs.oauth2.token-expires-at", "1000"))
+ .build(),
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(
+ ImmutableMap.of(
+ "gcs.oauth2.token", "gcsToken2",
"gcs.oauth2.token-expires-at", "2000"))
+ .build());
+
+ PlanTableScanResponse response =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withCredentials(credentials)
+ .withSpecsById(PARTITION_SPECS_BY_ID)
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"status\" : \"completed\",\n"
+ + " \"storage-credentials\" : [ {\n"
+ + " \"prefix\" : \"s3://custom-uri\",\n"
+ + " \"config\" : {\n"
+ + " \"s3.access-key-id\" : \"keyId\",\n"
+ + " \"s3.secret-access-key\" : \"accessKey\",\n"
+ + " \"s3.session-token\" : \"sessionToken\"\n"
+ + " }\n"
+ + " }, {\n"
+ + " \"prefix\" : \"gs://custom-uri\",\n"
+ + " \"config\" : {\n"
+ + " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
+ + " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
+ + " }\n"
+ + " }, {\n"
+ + " \"prefix\" : \"gs\",\n"
+ + " \"config\" : {\n"
+ + " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
+ + " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
+ + " }\n"
+ + " } ]\n"
+ + "}";
+
+ String json = PlanTableScanResponseParser.toJson(response, true);
+ assertThat(json).isEqualTo(expectedJson);
+
+ PlanTableScanResponse fromResponse =
+ PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID,
false);
+ PlanTableScanResponse copyResponse =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(fromResponse.planStatus())
+ .withPlanId(fromResponse.planId())
+ .withSpecsById(PARTITION_SPECS_BY_ID)
+ .withCredentials(credentials)
+ .build();
+
+ assertThat(PlanTableScanResponseParser.toJson(copyResponse,
true)).isEqualTo(expectedJson);
+ }
+
+ @Test
+ public void roundTripSerdeWithValidStatusAndFileScanTasksAndCredentials() {
+ ResidualEvaluator residualEvaluator =
+ ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true);
+ FileScanTask fileScanTask =
+ new BaseFileScanTask(
+ FILE_A,
+ new DeleteFile[] {FILE_A_DELETES},
+ SchemaParser.toJson(SCHEMA),
+ PartitionSpecParser.toJson(SPEC),
+ residualEvaluator);
+
+ List<Credential> credentials =
+ ImmutableList.of(
+ ImmutableCredential.builder()
+ .prefix("s3://custom-uri")
+ .config(
+ ImmutableMap.of(
+ "s3.access-key-id",
+ "keyId",
+ "s3.secret-access-key",
+ "accessKey",
+ "s3.session-token",
+ "sessionToken"))
+ .build(),
+ ImmutableCredential.builder()
+ .prefix("gs://custom-uri")
+ .config(
+ ImmutableMap.of(
+ "gcs.oauth2.token", "gcsToken1",
"gcs.oauth2.token-expires-at", "1000"))
+ .build(),
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(
+ ImmutableMap.of(
+ "gcs.oauth2.token", "gcsToken2",
"gcs.oauth2.token-expires-at", "2000"))
+ .build());
+ PlanTableScanResponse response =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withFileScanTasks(List.of(fileScanTask))
+ .withDeleteFiles(List.of(FILE_A_DELETES))
+ .withSpecsById(PARTITION_SPECS_BY_ID)
+ .withCredentials(credentials)
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"status\" : \"completed\",\n"
+ + " \"storage-credentials\" : [ {\n"
+ + " \"prefix\" : \"s3://custom-uri\",\n"
+ + " \"config\" : {\n"
+ + " \"s3.access-key-id\" : \"keyId\",\n"
+ + " \"s3.secret-access-key\" : \"accessKey\",\n"
+ + " \"s3.session-token\" : \"sessionToken\"\n"
+ + " }\n"
+ + " }, {\n"
+ + " \"prefix\" : \"gs://custom-uri\",\n"
+ + " \"config\" : {\n"
+ + " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
+ + " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
+ + " }\n"
+ + " }, {\n"
+ + " \"prefix\" : \"gs\",\n"
+ + " \"config\" : {\n"
+ + " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
+ + " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
+ + " }\n"
+ + " } ],\n"
+ + " \"delete-files\" : [ {\n"
+ + " \"spec-id\" : 0,\n"
+ + " \"content\" : \"POSITION_DELETES\",\n"
+ + " \"file-path\" : \"/path/to/data-a-deletes.parquet\",\n"
+ + " \"file-format\" : \"PARQUET\",\n"
+ + " \"partition\" : {\n"
+ + " \"1000\" : 0\n"
+ + " },\n"
+ + " \"file-size-in-bytes\" : 10,\n"
+ + " \"record-count\" : 1\n"
+ + " } ],\n"
+ + " \"file-scan-tasks\" : [ {\n"
+ + " \"data-file\" : {\n"
+ + " \"spec-id\" : 0,\n"
+ + " \"content\" : \"DATA\",\n"
+ + " \"file-path\" : \"/path/to/data-a.parquet\",\n"
+ + " \"file-format\" : \"PARQUET\",\n"
+ + " \"partition\" : {\n"
+ + " \"1000\" : 0\n"
+ + " },\n"
+ + " \"file-size-in-bytes\" : 10,\n"
+ + " \"record-count\" : 1,\n"
+ + " \"sort-order-id\" : 0\n"
+ + " },\n"
+ + " \"delete-file-references\" : [ 0 ],\n"
+ + " \"residual-filter\" : {\n"
+ + " \"type\" : \"eq\",\n"
+ + " \"term\" : \"id\",\n"
+ + " \"value\" : 1\n"
+ + " }\n"
+ + " } ]\n"
+ + "}";
+
+ String json = PlanTableScanResponseParser.toJson(response, true);
+ assertThat(json).isEqualTo(expectedJson);
+
+ PlanTableScanResponse fromResponse =
+ PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID,
false);
+ PlanTableScanResponse copyResponse =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(fromResponse.planStatus())
+ .withPlanId(fromResponse.planId())
+ .withPlanTasks(fromResponse.planTasks())
+ .withDeleteFiles(fromResponse.deleteFiles())
+ .withFileScanTasks(fromResponse.fileScanTasks())
+ .withSpecsById(PARTITION_SPECS_BY_ID)
+ .withCredentials(credentials)
+ .build();
+
+ assertThat(PlanTableScanResponseParser.toJson(copyResponse,
true)).isEqualTo(expectedJson);
+ }
}