amogh-jahagirdar commented on code in PR #13004:
URL: https://github.com/apache/iceberg/pull/13004#discussion_r2268104234


##########
core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.rest.RESTResponse;
+
+public abstract class BaseScanResponse implements RESTResponse {

Review Comment:
   `BaseScanTaskResponse`? 



##########
core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.PlanStatus;
+
+public class PlanTableScanResponse extends BaseScanResponse {
+  private final PlanStatus planStatus;
+  private final String planId;
+
+  private PlanTableScanResponse(
+      PlanStatus planStatus,
+      String planId,
+      List<String> planTasks,
+      List<FileScanTask> fileScanTasks,
+      List<DeleteFile> deleteFiles,
+      Map<Integer, PartitionSpec> specsById) {
+    super(planTasks, fileScanTasks, deleteFiles, specsById);
+    this.planStatus = planStatus;
+    this.planId = planId;
+    validate();
+  }
+
+  public PlanStatus planStatus() {
+    return planStatus;
+  }
+
+  public String planId() {
+    return planId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("planStatus", planStatus())
+        .add("planId", planId())
+        .toString();
+  }
+
+  @Override
+  public void validate() {
+    Preconditions.checkArgument(
+        planStatus() != null, "Invalid response: plan status must be defined");
+    Preconditions.checkArgument(
+        planStatus() != PlanStatus.SUBMITTED || planId() != null,
+        "Invalid response: plan id should be defined when status is 
'submitted'");
+    Preconditions.checkArgument(
+        planStatus() != PlanStatus.CANCELLED,
+        "Invalid response: 'cancelled' is not a valid status for 
planTableScan");
+    Preconditions.checkArgument(
+        planStatus() == PlanStatus.COMPLETED || (planTasks() == null && 
fileScanTasks() == null),
+        "Invalid response: tasks can only be returned in a 'completed' 
status");

Review Comment:
   nit on some of these messages: I think including the word "returned" in the 
message isn't really the right way to express the failure. Yes, typically this 
will be because a server returned some invalid data and the client can't build 
the model. But more generally, error messages at this abstraction shouldn't 
care about the overall context. What if I'm implementing a server and I just 
build a bad `PlanTableScanResponse`? Nothing is really being returned etc.
   
   So I think something like "Invalid response: tasks can only be *defined* 
when status is completed" is better.
   
   So I'd probably go through and just double check some of these because we're 
assuming some context (albeit totally reasonable/typical context) in these 
error messages.



##########
core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.rest.RESTResponse;
+
+public abstract class BaseScanResponse implements RESTResponse {
+
+  private final List<String> planTasks;
+  private final List<FileScanTask> fileScanTasks;
+  private final List<DeleteFile> deleteFiles;
+  private final Map<Integer, PartitionSpec> specsById;
+
+  protected BaseScanResponse(
+      List<String> planTasks,
+      List<FileScanTask> fileScanTasks,
+      List<DeleteFile> deleteFiles,
+      Map<Integer, PartitionSpec> specsById) {
+    this.planTasks = planTasks;
+    this.fileScanTasks = fileScanTasks;
+    this.deleteFiles = deleteFiles;
+    this.specsById = specsById;
+  }
+
+  public List<String> planTasks() {
+    return planTasks;
+  }
+
+  public List<FileScanTask> fileScanTasks() {
+    return fileScanTasks;
+  }
+
+  public List<DeleteFile> deleteFiles() {
+    return deleteFiles;
+  }
+
+  public Map<Integer, PartitionSpec> specsById() {
+    return specsById;
+  }
+
+  public abstract static class Builder<B extends Builder<B, R>, R extends 
BaseScanResponse> {
+    private List<String> planTasks;
+    private List<FileScanTask> fileScanTasks;
+    private List<DeleteFile> deleteFiles;
+    private Map<Integer, PartitionSpec> specsById;
+
+    protected Builder() {}
+
+    @SuppressWarnings("unchecked")
+    public B self() {
+      return (B) this;
+    }

Review Comment:
   I'd probably inline this cast in the response instead of have a separate 
`self()`



##########
core/src/main/java/org/apache/iceberg/rest/responses/BaseScanResponse.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.rest.RESTResponse;
+
+public abstract class BaseScanResponse implements RESTResponse {
+
+  private final List<String> planTasks;
+  private final List<FileScanTask> fileScanTasks;
+  private final List<DeleteFile> deleteFiles;
+  private final Map<Integer, PartitionSpec> specsById;
+
+  protected BaseScanResponse(
+      List<String> planTasks,
+      List<FileScanTask> fileScanTasks,
+      List<DeleteFile> deleteFiles,
+      Map<Integer, PartitionSpec> specsById) {
+    this.planTasks = planTasks;
+    this.fileScanTasks = fileScanTasks;
+    this.deleteFiles = deleteFiles;
+    this.specsById = specsById;
+  }
+
+  public List<String> planTasks() {
+    return planTasks;
+  }
+
+  public List<FileScanTask> fileScanTasks() {
+    return fileScanTasks;
+  }
+
+  public List<DeleteFile> deleteFiles() {
+    return deleteFiles;
+  }

Review Comment:
   I went back and forth on if we should do defensive copies here but I think 
this is fine; we're dealing with potentially memory heavy data files and it's 
reasonable to expect that both a.) implementations are passing immutable 
structures here already b.) readers won't mess around with reading these task 
and then trying to add more stuff.



##########
core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RESTFileScanTaskParser {
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILE_REFERENCES = 
"delete-file-references";
+  private static final String RESIDUAL_FILTER = "residual-filter";
+
+  private RESTFileScanTaskParser() {}
+
+  public static void toJson(
+      FileScanTask fileScanTask,
+      Set<Integer> deleteFileReferences,
+      PartitionSpec partitionSpec,
+      JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeStartObject();
+    generator.writeFieldName(DATA_FILE);
+    ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator);
+    if (deleteFileReferences != null) {
+      JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, 
generator);
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL_FILTER);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+
+    generator.writeEndObject();
+  }
+
+  public static FileScanTask fromJson(
+      JsonNode jsonNode,
+      List<DeleteFile> allDeleteFiles,
+      Map<Integer, PartitionSpec> specsById,
+      boolean isCaseSensitive) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
+
+    DataFile dataFile =
+        (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, 
jsonNode), specsById);
+    int specId = dataFile.specId();
+
+    DeleteFile[] deleteFiles = null;
+    if (jsonNode.has(DELETE_FILE_REFERENCES)) {
+      List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, 
jsonNode);
+      deleteFiles =
+          indices.stream()
+              .map(index -> (GenericDeleteFile) allDeleteFiles.get(index))

Review Comment:
   I think we should be a bit defensive here in case the response has 
`DELETE_FILE_REFERENCES` but has no deletes. Right now, it looks like we return 
null 
https://github.com/apache/iceberg/blob/580097c8449bbada1357192680d16363a501fc8c/core/src/main/java/org/apache/iceberg/TableScanResponseParser.java#L55
 when there are no top level deletes.
   
   Could we add a Preconditions here that if there are delete file references 
that allDeleteFiles != null?



##########
core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RESTFileScanTaskParser {
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILE_REFERENCES = 
"delete-file-references";
+  private static final String RESIDUAL_FILTER = "residual-filter";
+
+  private RESTFileScanTaskParser() {}
+
+  public static void toJson(
+      FileScanTask fileScanTask,
+      Set<Integer> deleteFileReferences,
+      PartitionSpec partitionSpec,
+      JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeStartObject();
+    generator.writeFieldName(DATA_FILE);
+    ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator);
+    if (deleteFileReferences != null) {
+      JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, 
generator);
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL_FILTER);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+
+    generator.writeEndObject();
+  }
+
+  public static FileScanTask fromJson(
+      JsonNode jsonNode,
+      List<DeleteFile> allDeleteFiles,
+      Map<Integer, PartitionSpec> specsById,
+      boolean isCaseSensitive) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
+
+    DataFile dataFile =
+        (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, 
jsonNode), specsById);
+    int specId = dataFile.specId();
+
+    DeleteFile[] deleteFiles = null;
+    if (jsonNode.has(DELETE_FILE_REFERENCES)) {
+      List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, 
jsonNode);
+      deleteFiles =
+          indices.stream()
+              .map(index -> (GenericDeleteFile) allDeleteFiles.get(index))

Review Comment:
   Well actually, I think it's really that there should be at least 1 delete 
file in `allDeleteFiles`. In that case could we change that implementation I 
linked to return an empty list. Then the preconditions check can just be 
`!allDeleteFiles.isEmpty()`



##########
core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java:
##########
@@ -470,4 +494,112 @@ public T deserialize(JsonParser p, DeserializationContext 
context) throws IOExce
       return (T) LoadCredentialsResponseParser.fromJson(jsonNode);
     }
   }
+
+  static class PlanTableScanRequestSerializer<T extends PlanTableScanRequest>
+      extends JsonSerializer<T> {
+    @Override
+    public void serialize(T request, JsonGenerator gen, SerializerProvider 
serializers)
+        throws IOException {
+      PlanTableScanRequestParser.toJson(request, gen);
+    }
+  }
+
+  static class PlanTableScanRequestDeserializer<T extends PlanTableScanRequest>
+      extends JsonDeserializer<T> {
+    @Override
+    public T deserialize(JsonParser p, DeserializationContext context) throws 
IOException {
+      JsonNode jsonNode = p.getCodec().readTree(p);
+      return (T) PlanTableScanRequestParser.fromJson(jsonNode);
+    }
+  }
+
+  static class FetchScanTasksRequestSerializer<T extends FetchScanTasksRequest>
+      extends JsonSerializer<T> {
+    @Override
+    public void serialize(T request, JsonGenerator gen, SerializerProvider 
serializers)
+        throws IOException {
+      FetchScanTasksRequestParser.toJson(request, gen);
+    }
+  }
+
+  static class FetchScanTasksRequestDeserializer<T extends 
FetchScanTasksRequest>
+      extends JsonDeserializer<T> {
+    @Override
+    public T deserialize(JsonParser p, DeserializationContext context) throws 
IOException {
+      JsonNode jsonNode = p.getCodec().readTree(p);
+      return (T) FetchScanTasksRequestParser.fromJson(jsonNode);
+    }
+  }
+
+  static class PlanTableScanResponseSerializer<T extends PlanTableScanResponse>
+      extends JsonSerializer<T> {
+    @Override
+    public void serialize(T response, JsonGenerator gen, SerializerProvider 
serializers)
+        throws IOException {
+      PlanTableScanResponseParser.toJson(response, gen);
+    }
+  }
+
+  static class PlanTableScanResponseDeserializer<T extends 
PlanTableScanResponse>
+      extends JsonDeserializer<T> {
+    @Override
+    public T deserialize(JsonParser p, DeserializationContext context) throws 
IOException {
+      JsonNode jsonNode = p.getCodec().readTree(p);
+      // Retrieve injectable values
+      @SuppressWarnings("unchecked")
+      Map<Integer, PartitionSpec> specsById =
+          (Map<Integer, PartitionSpec>) 
context.findInjectableValue("specsById", null, null);
+
+      boolean caseSensitive = (boolean) 
context.findInjectableValue("caseSensitive", null, null);

Review Comment:
   Minor: Maybe worth having a static class `TableScanResponseContext` with 
these two fields and we have a helper function 
````parseScanResponseContext(DeserializationContext context) {return new 
TableScanResponseContext(context.find...))```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to