amogh-jahagirdar commented on code in PR #13004:
URL: https://github.com/apache/iceberg/pull/13004#discussion_r2244413365


##########
core/src/main/java/org/apache/iceberg/rest/requests/FetchScanTasksRequestParser.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.requests;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FetchScanTasksRequestParser {
+  private static final String PLAN_TASK = "plan-task";
+
+  private FetchScanTasksRequestParser() {}
+
+  public static String toJson(FetchScanTasksRequest request) {
+    return toJson(request, false);
+  }
+
+  public static String toJson(FetchScanTasksRequest request, boolean pretty) {
+    return JsonUtil.generate(gen -> toJson(request, gen), pretty);
+  }
+
+  public static void toJson(FetchScanTasksRequest request, JsonGenerator gen) 
throws IOException {
+    Preconditions.checkArgument(null != request, "Invalid request: 
fetchScanTasks request null");

Review Comment:
   Could we clean up these error messages just so they're a bit more consistent 
with the existing pattern in the code:
   
   "Invalid fetchScanTasks request: null"



##########
core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponse.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import org.apache.iceberg.rest.RESTResponse;
+
+public interface TableScanResponse extends RESTResponse {}

Review Comment:
   I'm not sure this marker interface is really doing anything at this point? 
Should we make this an abstract class and extract the common structures across 
all the scan responses?
   
   Here are the fields I've identified that are common:
   
     private final List<String> planTasks;
     private final List<FileScanTask> fileScanTasks;
     private final List<DeleteFile> deleteFiles;
     private final Map<Integer, PartitionSpec> specsById;



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -215,6 +215,7 @@ protected RESTCatalog initCatalog(String catalogName, 
Map<String, String> additi
             "catalog:12345",
             "header.test-header",
             "test-value");
+

Review Comment:
   nit, unnecessary whitespace change



##########
core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.PlanStatus;
+
+public class PlanTableScanResponse implements TableScanResponse {
+  private final PlanStatus planStatus;
+  private final String planId;
+  private final List<String> planTasks;
+  private final List<FileScanTask> fileScanTasks;
+  private final List<DeleteFile> deleteFiles;
+  private final Map<Integer, PartitionSpec> specsById;
+
+  private PlanTableScanResponse(
+      PlanStatus planStatus,
+      String planId,
+      List<String> planTasks,
+      List<FileScanTask> fileScanTasks,
+      List<DeleteFile> deleteFiles,
+      Map<Integer, PartitionSpec> specsById) {
+    this.planStatus = planStatus;
+    this.planId = planId;
+    this.planTasks = planTasks;
+    this.fileScanTasks = fileScanTasks;
+    this.deleteFiles = deleteFiles;
+    this.specsById = specsById;
+    validate();
+  }
+
+  public PlanStatus planStatus() {
+    return planStatus;
+  }
+
+  public String planId() {
+    return planId;
+  }
+
+  public List<String> planTasks() {
+    return planTasks;
+  }
+
+  public List<FileScanTask> fileScanTasks() {
+    return fileScanTasks;
+  }
+
+  public List<DeleteFile> deleteFiles() {
+    return deleteFiles;
+  }
+
+  public Map<Integer, PartitionSpec> specsById() {
+    return specsById;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("planStatus", planStatus)
+        .add("planId", planId)
+        .toString();
+  }
+
+  @Override
+  public void validate() {
+    Preconditions.checkArgument(
+        planStatus() != null, "Invalid response: plan status must be defined");
+    Preconditions.checkArgument(
+        planStatus() != PlanStatus.SUBMITTED || planId() != null,
+        "Invalid response: plan id should be defined when status is 
'submitted'");
+    Preconditions.checkArgument(
+        planStatus() != PlanStatus.CANCELLED,
+        "Invalid response: 'cancelled' is not a valid status for 
planTableScan");
+    Preconditions.checkArgument(
+        planStatus() == PlanStatus.COMPLETED || (planTasks() == null && 
fileScanTasks() == null),
+        "Invalid response: tasks can only be returned in a 'completed' 
status");
+    Preconditions.checkArgument(
+        planStatus() == PlanStatus.SUBMITTED || planId() == null,
+        "Invalid response: plan id can only be returned in a 'submitted' 
status");
+    if (fileScanTasks() == null || fileScanTasks.isEmpty()) {
+      Preconditions.checkArgument(
+          (deleteFiles() == null || deleteFiles().isEmpty()),
+          "Invalid response: deleteFiles should only be returned with 
fileScanTasks that reference them");
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private Builder() {}
+
+    private PlanStatus planStatus;
+    private String planId;
+    private List<String> planTasks;
+    private List<FileScanTask> fileScanTasks;
+    private List<DeleteFile> deleteFiles;
+    private Map<Integer, PartitionSpec> specsById;
+
+    public Builder withPlanStatus(PlanStatus status) {
+      this.planStatus = status;
+      return this;
+    }
+
+    public Builder withPlanId(String id) {
+      this.planId = id;
+      return this;
+    }
+
+    public Builder withPlanTasks(List<String> tasks) {
+      this.planTasks = tasks;
+      return this;
+    }
+
+    public Builder withFileScanTasks(List<FileScanTask> tasks) {
+      this.fileScanTasks = tasks;
+      return this;
+    }
+
+    public Builder withDeleteFiles(List<DeleteFile> deletes) {
+      this.deleteFiles = deletes;
+      return this;
+    }
+
+    public Builder withSpecsById(Map<Integer, PartitionSpec> specs) {
+      this.specsById = specs;
+      return this;

Review Comment:
   Similar to the scan response structure, I wonder if it makes sense to have a 
separate base builder for setting fileScanTasks, deleteFiles, and specs since 
those are all common across all task responses that get sent back. I think it'd 
reduce a fair bit of duplication



##########
core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RESTFileScanTaskParser {
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILE_REFERENCES = 
"delete-file-references";
+  private static final String RESIDUAL_FILTER = "residual-filter";
+
+  private RESTFileScanTaskParser() {}
+
+  public static void toJson(
+      FileScanTask fileScanTask,
+      Set<Integer> deleteFileReferences,
+      PartitionSpec partitionSpec,
+      JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeStartObject();
+    generator.writeFieldName(DATA_FILE);
+    ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator);
+    if (deleteFileReferences != null) {
+      JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, 
generator);
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL_FILTER);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+    generator.writeEndObject();
+  }
+
+  public static FileScanTask fromJson(
+      JsonNode jsonNode,
+      List<DeleteFile> allDeleteFiles,
+      Map<Integer, PartitionSpec> specsById,
+      boolean isCaseSensitive) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
+
+    DataFile dataFile =
+        (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, 
jsonNode), specsById);
+    int specId = dataFile.specId();
+
+    DeleteFile[] deleteFiles = null;
+    Set<Integer> deleteFileReferences = Sets.newHashSet();
+    if (jsonNode.has(DELETE_FILE_REFERENCES)) {
+      
deleteFileReferences.addAll(JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, 
jsonNode));
+      ImmutableList.Builder<GenericDeleteFile> builder = 
ImmutableList.builder();
+      deleteFileReferences.forEach(
+          delIdx -> builder.add((GenericDeleteFile) 
allDeleteFiles.get(delIdx)));
+      deleteFiles = builder.build().toArray(new GenericDeleteFile[0]);
+    }

Review Comment:
   Sorry I remember I proposed this code to @rahil-c a while back but I think 
we can probably simplify this further
   
   
   ```
   if (jsonNode.has(DELETE_FILE_REFERENCES)) {
       List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, 
jsonNode);
       deleteFiles = indices.stream()
           .map(index -> (GenericDeleteFile) allDeleteFiles.get(index))
           .toArray(GenericDeleteFile[]::new);
   }
   ```



##########
core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.FILE_A_DELETES;
+import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonFactoryBuilder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.util.List;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.RESTSerializers;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestFetchPlanningResultResponseParser {
+
+  private static final JsonFactory FACTORY =
+      new JsonFactoryBuilder()
+          .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)
+          .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false)
+          .build();
+  private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
+

Review Comment:
   This exists because we don't currently expose `RESTObjectMapper` right?



##########
core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RESTFileScanTaskParser {
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILE_REFERENCES = 
"delete-file-references";
+  private static final String RESIDUAL_FILTER = "residual-filter";
+
+  private RESTFileScanTaskParser() {}
+
+  public static void toJson(
+      FileScanTask fileScanTask,
+      Set<Integer> deleteFileReferences,
+      PartitionSpec partitionSpec,
+      JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeStartObject();
+    generator.writeFieldName(DATA_FILE);
+    ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator);
+    if (deleteFileReferences != null) {
+      JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, 
generator);
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL_FILTER);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+    generator.writeEndObject();
+  }
+
+  public static FileScanTask fromJson(
+      JsonNode jsonNode,
+      List<DeleteFile> allDeleteFiles,
+      Map<Integer, PartitionSpec> specsById,
+      boolean isCaseSensitive) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
+
+    DataFile dataFile =
+        (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, 
jsonNode), specsById);
+    int specId = dataFile.specId();
+
+    DeleteFile[] deleteFiles = null;
+    Set<Integer> deleteFileReferences = Sets.newHashSet();
+    if (jsonNode.has(DELETE_FILE_REFERENCES)) {
+      
deleteFileReferences.addAll(JsonUtil.getIntegerList(DELETE_FILE_REFERENCES, 
jsonNode));
+      ImmutableList.Builder<GenericDeleteFile> builder = 
ImmutableList.builder();
+      deleteFileReferences.forEach(
+          delIdx -> builder.add((GenericDeleteFile) 
allDeleteFiles.get(delIdx)));
+      deleteFiles = builder.build().toArray(new GenericDeleteFile[0]);
+    }
+
+    Expression filter = null;
+    if (jsonNode.has(RESIDUAL_FILTER)) {
+      filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER));
+    }
+
+    String schemaString = SchemaParser.toJson(specsById.get(specId).schema());
+    String specString = PartitionSpecParser.toJson(specsById.get(specId));

Review Comment:
   Optimization for later: The parser implementation may want to cache the 
serialized spec/schema strings so that CPU cycles don't get burned on 
serializing the same schema/spec many times



##########
core/src/main/java/org/apache/iceberg/ContentFileParser.java:
##########
@@ -134,29 +134,31 @@ static void toJson(ContentFile<?> contentFile, 
PartitionSpec spec, JsonGenerator
     generator.writeEndObject();
   }
 
-  static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
+  static ContentFile<?> fromJson(JsonNode jsonNode, Map<Integer, 
PartitionSpec> specsById) {

Review Comment:
   It may end up being less intrusive to have 2 methods:
   
   fromJson(JsonNode jsonNode, Map<Integer, PartitionSpec> specsById)
   fromJson(JsonNode jsonNode, PartitionSPec spec)
   
   the second calls the first with ImmutableMap.of(spec.specId(), spec);
   
   Then all the places which are using the old style method don't need to change
   



##########
core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.FILE_A_DELETES;
+import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.List;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.rest.PlanStatus;
+import org.junit.jupiter.api.Test;
+
+public class TestPlanTableScanResponseParser {
+
+  @Test
+  public void nullAndEmptyCheck() {
+    assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: planTableScanResponse null");
+
+    assertThatThrownBy(
+            () ->
+                PlanTableScanResponseParser.fromJson((JsonNode) null, 
PARTITION_SPECS_BY_ID, false))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse planTableScan response from empty or null 
object");
+  }
+
+  @Test
+  public void roundTripSerdeWithEmptyObject() {
+
+    assertThatThrownBy(() -> PlanTableScanResponse.builder().build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: plan status must be defined");
+
+    String emptyJson = "{ }";
+    assertThatThrownBy(
+            () -> PlanTableScanResponseParser.fromJson(emptyJson, 
PARTITION_SPECS_BY_ID, false))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse planTableScan response from empty or null 
object");
+  }
+

Review Comment:
   Not needed in this PR but I wonder if there's a pattern where we have a 
simple base test class where we supply the response parser and expected error 
message. Pretty much every parser would be expected to fail on null/empty so we 
can extract all that and pass in that.
   
   At the least it'd be nice to just parameterize as much as possible here



##########
core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableScanResponseParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FetchPlanningResultResponseParser {
+  private static final String PLAN_STATUS = "plan-status";
+  private static final String PLAN_TASKS = "plan-tasks";
+
+  private FetchPlanningResultResponseParser() {}
+
+  public static String toJson(FetchPlanningResultResponse response) {
+    return toJson(response, false);
+  }
+
+  public static String toJson(FetchPlanningResultResponse response, boolean 
pretty) {
+    return JsonUtil.generate(gen -> toJson(response, gen), pretty);
+  }
+
+  public static void toJson(FetchPlanningResultResponse response, 
JsonGenerator gen)
+      throws IOException {
+    Preconditions.checkArgument(
+        null != response, "Invalid response: fetchPanningResultResponse null");
+    Preconditions.checkArgument(
+        response.specsById() != null
+            || (response.fileScanTasks() == null || 
response.fileScanTasks().isEmpty()),
+        "Cannot serialize fileScanTasks in fetchingPlanningResultResponse 
without specsById");
+    gen.writeStartObject();
+    gen.writeStringField(PLAN_STATUS, response.planStatus().status());
+    if (response.planTasks() != null) {
+      JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen);
+    }
+
+    TableScanResponseParser.serializeScanTasks(
+        response.fileScanTasks(), response.deleteFiles(), 
response.specsById(), gen);
+    gen.writeEndObject();
+  }
+
+  public static FetchPlanningResultResponse fromJson(
+      String json, Map<Integer, PartitionSpec> specsById, boolean 
caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid response: 
fetchPanningResultResponse null");

Review Comment:
   Same as before, more consistent error messaging and there's a small typo 
here too
   
   "Invalid fetchPlanningResult response: null"



##########
core/src/main/java/org/apache/iceberg/RESTFileScanTaskParser.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RESTFileScanTaskParser {
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILE_REFERENCES = 
"delete-file-references";
+  private static final String RESIDUAL_FILTER = "residual-filter";
+
+  private RESTFileScanTaskParser() {}
+
+  public static void toJson(
+      FileScanTask fileScanTask,
+      Set<Integer> deleteFileReferences,
+      PartitionSpec partitionSpec,
+      JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeStartObject();
+    generator.writeFieldName(DATA_FILE);
+    ContentFileParser.toJson(fileScanTask.file(), partitionSpec, generator);
+    if (deleteFileReferences != null) {
+      JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, 
generator);
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL_FILTER);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+    generator.writeEndObject();

Review Comment:
   nit: newline after if



##########
core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.responses;
+
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.FILE_A_DELETES;
+import static org.apache.iceberg.TestBase.PARTITION_SPECS_BY_ID;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.List;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.junit.jupiter.api.Test;
+
+public class TestFetchScanTasksResponseParser {
+
+  @Test
+  public void nullAndEmptyCheck() {
+    assertThatThrownBy(() -> FetchScanTasksResponseParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: fetchScanTasksResponse null");
+
+    assertThatThrownBy(
+            () ->
+                FetchScanTasksResponseParser.fromJson(
+                    (JsonNode) null, PARTITION_SPECS_BY_ID, false))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: fetchScanTasksResponse null");
+  }
+
+  @Test
+  public void roundTripSerdeWithEmptyObject() {
+    assertThatThrownBy(
+            () -> 
FetchScanTasksResponseParser.toJson(FetchScanTasksResponse.builder().build()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: planTasks and fileScanTask cannot both 
be null");
+
+    String emptyJson = "{ }";
+    assertThatThrownBy(
+            () -> FetchScanTasksResponseParser.fromJson(emptyJson, 
PARTITION_SPECS_BY_ID, false))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid response: fetchScanTasksResponse null");
+  }
+
+  @Test
+  public void roundTripSerdeWithPlanTasks() {
+    String expectedJson = "{\"plan-tasks\":[\"task1\",\"task2\"]}";
+    String json =
+        FetchScanTasksResponseParser.toJson(
+            FetchScanTasksResponse.builder().withPlanTasks(List.of("task1", 
"task2")).build());
+    assertThat(json).isEqualTo(expectedJson);
+
+    FetchScanTasksResponse fromResponse =
+        FetchScanTasksResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, 
false);
+
+    // can't do an equality comparison on PlanTableScanRequest because we 
don't implement
+    // equals/hashcode
+    assertThat(FetchScanTasksResponseParser.toJson(fromResponse, 
false)).isEqualTo(expectedJson);
+  }
+
+  @Test
+  public void roundTripSerdeWithDeleteFilesNoFileScanTasksPresent() {
+    assertThatThrownBy(
+            () ->
+                FetchScanTasksResponse.builder()
+                    .withPlanTasks(List.of("task1", "task2"))
+                    .withDeleteFiles(List.of(FILE_A_DELETES))
+                    .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Invalid response: deleteFiles should only be returned with 
fileScanTasks that reference them");
+
+    String invalidJson =
+        "{\"plan-tasks\":[\"task1\",\"task2\"],"
+            + 
"\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+            + 
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+            + 
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}]"
+            + "}";
+
+    assertThatThrownBy(
+            () -> FetchScanTasksResponseParser.fromJson(invalidJson, 
PARTITION_SPECS_BY_ID, false))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Invalid response: deleteFiles should only be returned with 
fileScanTasks that reference them");
+  }
+
+  @Test
+  public void roundTripSerdeWithFileScanTasks() {
+    ResidualEvaluator residualEvaluator =
+        ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true);
+    FileScanTask fileScanTask =
+        new BaseFileScanTask(
+            FILE_A,
+            new DeleteFile[] {FILE_A_DELETES},
+            SchemaParser.toJson(SCHEMA),
+            PartitionSpecParser.toJson(SPEC),
+            residualEvaluator);
+
+    FetchScanTasksResponse response =
+        FetchScanTasksResponse.builder()
+            .withFileScanTasks(List.of(fileScanTask))
+            .withDeleteFiles(List.of(FILE_A_DELETES))
+            // assume you have set this already
+            .withSpecsById(PARTITION_SPECS_BY_ID)
+            .build();
+
+    String expectedToJson =
+        "{"
+            + 
"\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+            + 
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+            + 
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}],"
+            + "\"file-scan-tasks\":["
+            + 
"{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+            + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+            + 
"\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+            + "\"delete-file-references\":[0],"
+            + 
"\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]"
+            + "}";
+
+    String json = FetchScanTasksResponseParser.toJson(response, false);
+    assertThat(json).isEqualTo(expectedToJson);
+
+    FetchScanTasksResponse fromResponse =
+        FetchScanTasksResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, 
false);
+    // Need to make a new response with partitionSpec set
+    FetchScanTasksResponse copyResponse =
+        FetchScanTasksResponse.builder()
+            .withPlanTasks(fromResponse.planTasks())
+            .withDeleteFiles(fromResponse.deleteFiles())
+            .withFileScanTasks(fromResponse.fileScanTasks())
+            .withSpecsById(PARTITION_SPECS_BY_ID)
+            .build();
+
+    // can't do an equality comparison on PlanTableScanRequest because we 
don't implement
+    // equals/hashcode

Review Comment:
   Nit: generally we avoid first person like "we" in code comments 
   can we just keep this consistent with the other messages 
   
   ```
       // can't do an equality comparison on PlanTableScanRequest because <xyz> 
fields don't implement equals/hashcode
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to