amogh-jahagirdar commented on code in PR #13004: URL: https://github.com/apache/iceberg/pull/13004#discussion_r2268104234
########## core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.rest.RESTResponse; + +public abstract class BaseScanResponse implements RESTResponse { Review Comment: `BaseScanTaskResponse`? ########## core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; + +public class PlanTableScanResponse extends BaseScanResponse { + private final PlanStatus planStatus; + private final String planId; + + private PlanTableScanResponse( + PlanStatus planStatus, + String planId, + List<String> planTasks, + List<FileScanTask> fileScanTasks, + List<DeleteFile> deleteFiles, + Map<Integer, PartitionSpec> specsById) { + super(planTasks, fileScanTasks, deleteFiles, specsById); + this.planStatus = planStatus; + this.planId = planId; + validate(); + } + + public PlanStatus planStatus() { + return planStatus; + } + + public String planId() { + return planId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("planStatus", planStatus()) + .add("planId", planId()) + .toString(); + } + + @Override + public void validate() { + Preconditions.checkArgument( + planStatus() != null, "Invalid response: plan status must be defined"); + Preconditions.checkArgument( + planStatus() != PlanStatus.SUBMITTED || planId() != null, + "Invalid response: plan id should be defined when status is 'submitted'"); + Preconditions.checkArgument( + planStatus() != PlanStatus.CANCELLED, + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); Review Comment: nit on some of these messages: I think including the word "returned" in the message isn't really the right way to express the failure. Yes, typically this will be because a server returned some invalid data and the client can't build the model. But more generally, error messages at this abstraction shouldn't care about the overall context. What if I'm implementing a server and I just build a bad `PlanTableScanResponse`? Nothing is really being returned etc. So I think something like "Invalid response: tasks can only be *defined* when status is completed" is better. So I'd probably go through and just double check some of these because we're assuming some context (albeit totally reasonable/typical context) in these error messages. ########## core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.rest.RESTResponse; + +public abstract class BaseScanResponse implements RESTResponse { + + private final List<String> planTasks; + private final List<FileScanTask> fileScanTasks; + private final List<DeleteFile> deleteFiles; + private final Map<Integer, PartitionSpec> specsById; + + protected BaseScanResponse( + List<String> planTasks, + List<FileScanTask> fileScanTasks, + List<DeleteFile> deleteFiles, + Map<Integer, PartitionSpec> specsById) { + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + } + + public List<String> planTasks() { + return planTasks; + } + + public List<FileScanTask> fileScanTasks() { + return fileScanTasks; + } + + public List<DeleteFile> deleteFiles() { + return deleteFiles; + } + + public Map<Integer, PartitionSpec> specsById() { + return specsById; + } + + public abstract static class Builder<B extends Builder<B, R>, R extends BaseScanResponse> { + private List<String> planTasks; + private List<FileScanTask> fileScanTasks; + private List<DeleteFile> deleteFiles; + private Map<Integer, PartitionSpec> specsById; + + protected Builder() {} + + @SuppressWarnings("unchecked") + public B self() { + return (B) this; + } Review Comment: I'd probably inline this cast in the response instead of have a separate `self()` ########## core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.rest.RESTResponse; + +public abstract class BaseScanResponse implements RESTResponse { + + private final List<String> planTasks; + private final List<FileScanTask> fileScanTasks; + private final List<DeleteFile> deleteFiles; + private final Map<Integer, PartitionSpec> specsById; + + protected BaseScanResponse( + List<String> planTasks, + List<FileScanTask> fileScanTasks, + List<DeleteFile> deleteFiles, + Map<Integer, PartitionSpec> specsById) { + this.planTasks = planTasks; + this.fileScanTasks = fileScanTasks; + this.deleteFiles = deleteFiles; + this.specsById = specsById; + } + + public List<String> planTasks() { + return planTasks; + } + + public List<FileScanTask> fileScanTasks() { + return fileScanTasks; + } + + public List<DeleteFile> deleteFiles() { + return deleteFiles; + } Review Comment: I went back and forth on if we should do defensive copies here but I think this is fine; we're dealing with potentially memory heavy data files and it's reasonable to expect that both a.) implementations are passing immutable structures here already b.) readers won't mess around with reading these task and then trying to add more stuff. ########## core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class RESTFileScanTaskParser { + private static final String DATA_FILE = "data-file"; + private static final String DELETE_FILE_REFERENCES = "delete-file-references"; + private static final String RESIDUAL_FILTER = "residual-filter"; + + private RESTFileScanTaskParser() {} + + public static void toJson( + FileScanTask fileScanTask, + Set<Integer> deleteFileReferences, + PartitionSpec partitionSpec, + JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + generator.writeFieldName(DATA_FILE); + ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator); + if (deleteFileReferences != null) { + JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator); + } + + if (fileScanTask.residual() != null) { + generator.writeFieldName(RESIDUAL_FILTER); + ExpressionParser.toJson(fileScanTask.residual(), generator); + } + + generator.writeEndObject(); + } + + public static FileScanTask fromJson( + JsonNode jsonNode, + List<DeleteFile> allDeleteFiles, + Map<Integer, PartitionSpec> specsById, + boolean isCaseSensitive) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); + + DataFile dataFile = + (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), specsById); + int specId = dataFile.specId(); + + DeleteFile[] deleteFiles = null; + if (jsonNode.has(DELETE_FILE_REFERENCES)) { + List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode); + deleteFiles = + indices.stream() + .map(index -> (GenericDeleteFile) allDeleteFiles.get(index)) Review Comment: I think we should be a bit defensive here in case the response has `DELETE_FILE_REFERENCES` but has no deletes. Right now, it looks like we return null https://github.com/apache/iceberg/blob/580097c8449bbada1357192680d16363a501fc8c/core/src/main/java/org/apache/iceberg/TableScanResponseParser.java#L55 when there are no top level deletes. Could we add a Preconditions here that if there are delete file references that allDeleteFiles != null? ########## core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class RESTFileScanTaskParser { + private static final String DATA_FILE = "data-file"; + private static final String DELETE_FILE_REFERENCES = "delete-file-references"; + private static final String RESIDUAL_FILTER = "residual-filter"; + + private RESTFileScanTaskParser() {} + + public static void toJson( + FileScanTask fileScanTask, + Set<Integer> deleteFileReferences, + PartitionSpec partitionSpec, + JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + generator.writeFieldName(DATA_FILE); + ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator); + if (deleteFileReferences != null) { + JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator); + } + + if (fileScanTask.residual() != null) { + generator.writeFieldName(RESIDUAL_FILTER); + ExpressionParser.toJson(fileScanTask.residual(), generator); + } + + generator.writeEndObject(); + } + + public static FileScanTask fromJson( + JsonNode jsonNode, + List<DeleteFile> allDeleteFiles, + Map<Integer, PartitionSpec> specsById, + boolean isCaseSensitive) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); + + DataFile dataFile = + (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), specsById); + int specId = dataFile.specId(); + + DeleteFile[] deleteFiles = null; + if (jsonNode.has(DELETE_FILE_REFERENCES)) { + List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, jsonNode); + deleteFiles = + indices.stream() + .map(index -> (GenericDeleteFile) allDeleteFiles.get(index)) Review Comment: Well actually, I think it's really that there should be at least 1 delete file in `allDeleteFiles`. In that case could we change that implementation I linked to return an empty list. Then the preconditions check can just be `!allDeleteFiles.isEmpty()` ########## core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java: ########## @@ -470,4 +494,112 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) LoadCredentialsResponseParser.fromJson(jsonNode); } } + + static class PlanTableScanRequestSerializer<T extends PlanTableScanRequest> + extends JsonSerializer<T> { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanRequestParser.toJson(request, gen); + } + } + + static class PlanTableScanRequestDeserializer<T extends PlanTableScanRequest> + extends JsonDeserializer<T> { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) PlanTableScanRequestParser.fromJson(jsonNode); + } + } + + static class FetchScanTasksRequestSerializer<T extends FetchScanTasksRequest> + extends JsonSerializer<T> { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + FetchScanTasksRequestParser.toJson(request, gen); + } + } + + static class FetchScanTasksRequestDeserializer<T extends FetchScanTasksRequest> + extends JsonDeserializer<T> { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) FetchScanTasksRequestParser.fromJson(jsonNode); + } + } + + static class PlanTableScanResponseSerializer<T extends PlanTableScanResponse> + extends JsonSerializer<T> { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PlanTableScanResponseParser.toJson(response, gen); + } + } + + static class PlanTableScanResponseDeserializer<T extends PlanTableScanResponse> + extends JsonDeserializer<T> { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + // Retrieve injectable values + @SuppressWarnings("unchecked") + Map<Integer, PartitionSpec> specsById = + (Map<Integer, PartitionSpec>) context.findInjectableValue("specsById", null, null); + + boolean caseSensitive = (boolean) context.findInjectableValue("caseSensitive", null, null); Review Comment: Minor: Maybe worth having a static class `TableScanResponseContext` with these two fields and we have a helper function ````parseScanResponseContext(DeserializationContext context) {return new TableScanResponseContext(context.find...))``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
