This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a00f7477d Core: Add readable_metrics columns to files metadata tables 
(#5376)
9a00f7477d is described below

commit 9a00f7477dedac4501fb2de9e1e6d7aa83dc20b7
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Dec 5 10:08:50 2022 -0800

    Core: Add readable_metrics columns to files metadata tables (#5376)
---
 api/src/main/java/org/apache/iceberg/Schema.java   |   9 +
 .../java/org/apache/iceberg/types/TypeUtil.java    |  18 +-
 .../java/org/apache/iceberg/BaseFilesTable.java    |  93 ++++++-
 .../main/java/org/apache/iceberg/MetricsUtil.java  | 300 +++++++++++++++++++++
 .../org/apache/iceberg/TestMetadataTableScans.java |  87 ++++++
 .../spark/source/TestIcebergSourceTablesBase.java  |  75 +++---
 .../spark/source/TestIcebergSourceTablesBase.java  |  77 +++---
 .../spark/extensions/TestMetadataTables.java       | 144 +++++-----
 .../org/apache/iceberg/spark/data/TestHelpers.java |  28 ++
 .../spark/source/TestIcebergSourceTablesBase.java  |  73 +++--
 .../spark/extensions/TestMetadataTables.java       | 144 +++++-----
 .../org/apache/iceberg/spark/data/TestHelpers.java |  28 ++
 .../spark/source/TestIcebergSourceTablesBase.java  |  74 +++--
 .../source/TestMetadataTableReadableMetrics.java   | 286 ++++++++++++++++++++
 14 files changed, 1156 insertions(+), 280 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Schema.java 
b/api/src/main/java/org/apache/iceberg/Schema.java
index 34105a00ad..5e024b7c1c 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -233,6 +233,15 @@ public class Schema implements Serializable {
     return aliasToId;
   }
 
+  /**
+   * Returns a map for this schema between field id and qualified field names.
+   *
+   * @return a map of field id to qualified field names
+   */
+  public Map<Integer, String> idToName() {
+    return lazyIdToName();
+  }
+
   /**
    * Returns the underlying {@link StructType struct type} for this schema.
    *
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java 
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index d2b25fb598..4531f1b8ba 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -142,9 +142,21 @@ public class TypeUtil {
   }
 
   public static Schema join(Schema left, Schema right) {
-    List<Types.NestedField> joinedColumns = Lists.newArrayList();
-    joinedColumns.addAll(left.columns());
-    joinedColumns.addAll(right.columns());
+    List<Types.NestedField> joinedColumns = Lists.newArrayList(left.columns());
+    for (Types.NestedField rightColumn : right.columns()) {
+      Types.NestedField leftColumn = left.findField(rightColumn.fieldId());
+
+      if (leftColumn == null) {
+        joinedColumns.add(rightColumn);
+      } else {
+        Preconditions.checkArgument(
+            leftColumn.equals(rightColumn),
+            "Schemas have different columns with same id: %s, %s",
+            leftColumn,
+            rightColumn);
+      }
+    }
+
     return new Schema(joinedColumns);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java 
b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 4925506fe9..119d4743f1 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -21,6 +21,8 @@ package org.apache.iceberg;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -32,6 +34,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 
 /** Base class logic for files metadata tables */
@@ -48,10 +51,10 @@ abstract class BaseFilesTable extends BaseMetadataTable {
     if (partitionType.fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
       // field
-      return TypeUtil.selectNot(schema, 
Sets.newHashSet(DataFile.PARTITION_ID));
-    } else {
-      return schema;
+      schema = TypeUtil.selectNot(schema, 
Sets.newHashSet(DataFile.PARTITION_ID));
     }
+
+    return TypeUtil.join(schema, 
MetricsUtil.readableMetricsSchema(table().schema(), schema));
   }
 
   private static CloseableIterable<FileScanTask> planFiles(
@@ -140,15 +143,17 @@ abstract class BaseFilesTable extends BaseMetadataTable {
   }
 
   static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
     private final FileIO io;
     private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
-    private final Schema schema;
+    private final Schema dataTableSchema;
+    private final Schema projection;
 
     ManifestReadTask(
         Table table,
         ManifestFile manifest,
-        Schema schema,
+        Schema projection,
         String schemaString,
         String specString,
         ResidualEvaluator residuals) {
@@ -156,26 +161,54 @@ abstract class BaseFilesTable extends BaseMetadataTable {
       this.io = table.io();
       this.specsById = Maps.newHashMap(table.specs());
       this.manifest = manifest;
-      this.schema = schema;
+      this.dataTableSchema = table.schema();
+      this.projection = projection;
     }
 
     @Override
     public CloseableIterable<StructLike> rows() {
-      return CloseableIterable.transform(manifestEntries(), file -> 
(StructLike) file);
+      Types.NestedField readableMetricsField = 
projection.findField(MetricsUtil.READABLE_METRICS);
+
+      if (readableMetricsField == null) {
+        return CloseableIterable.transform(files(projection), file -> 
(StructLike) file);
+      } else {
+        // Remove virtual columns from the file projection and ensure that the 
underlying metrics
+        // used to create those columns are part of the file projection
+        Set<Integer> readableMetricsIds = 
TypeUtil.getProjectedIds(readableMetricsField.type());
+        Schema fileProjection = TypeUtil.selectNot(projection, 
readableMetricsIds);
+
+        Schema projectionForReadableMetrics =
+            new Schema(
+                MetricsUtil.READABLE_METRIC_COLS.stream()
+                    .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
+                    .collect(Collectors.toList()));
+
+        Schema projectionForMetrics = TypeUtil.join(fileProjection, 
projectionForReadableMetrics);
+        return CloseableIterable.transform(files(projectionForMetrics), 
this::withReadableMetrics);
+      }
     }
 
-    private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+    private CloseableIterable<? extends ContentFile<?>> files(Schema 
fileProjection) {
       switch (manifest.content()) {
         case DATA:
-          return ManifestFiles.read(manifest, io, specsById).project(schema);
+          return ManifestFiles.read(manifest, io, 
specsById).project(fileProjection);
         case DELETES:
-          return ManifestFiles.readDeleteManifest(manifest, io, 
specsById).project(schema);
+          return ManifestFiles.readDeleteManifest(manifest, io, 
specsById).project(fileProjection);
         default:
           throw new IllegalArgumentException(
               "Unsupported manifest content type:" + manifest.content());
       }
     }
 
+    private StructLike withReadableMetrics(ContentFile<?> file) {
+      int expectedSize = projection.columns().size();
+      StructType projectedMetricType =
+          
projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
+      MetricsUtil.ReadableMetricsStruct readableMetrics =
+          MetricsUtil.readableMetricsStruct(dataTableSchema, file, 
projectedMetricType);
+      return new ContentFileStructWithMetrics(expectedSize, (StructLike) file, 
readableMetrics);
+    }
+
     @Override
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
@@ -186,4 +219,44 @@ abstract class BaseFilesTable extends BaseMetadataTable {
       return manifest;
     }
   }
+
+  static class ContentFileStructWithMetrics implements StructLike {
+    private final StructLike fileAsStruct;
+    private final MetricsUtil.ReadableMetricsStruct readableMetrics;
+    private final int expectedSize;
+
+    ContentFileStructWithMetrics(
+        int expectedSize,
+        StructLike fileAsStruct,
+        MetricsUtil.ReadableMetricsStruct readableMetrics) {
+      this.fileAsStruct = fileAsStruct;
+      this.readableMetrics = readableMetrics;
+      this.expectedSize = expectedSize;
+    }
+
+    @Override
+    public int size() {
+      return expectedSize;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      int lastExpectedIndex = expectedSize - 1;
+      if (pos < lastExpectedIndex) {
+        return fileAsStruct.get(pos, javaClass);
+      } else if (pos == lastExpectedIndex) {
+        return javaClass.cast(readableMetrics);
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Illegal position access for ContentFileStructWithMetrics: %d, 
max allowed is %d",
+                pos, lastExpectedIndex));
+      }
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("ContentFileStructWithMetrics is 
read only");
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java 
b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index 98710ef79e..cde9bcb4a0 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -18,11 +18,23 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 
 public class MetricsUtil {
 
@@ -56,4 +68,292 @@ public class MetricsUtil {
     String columnName = inputSchema.findColumnName(fieldId);
     return metricsConfig.columnMode(columnName);
   }
+
+  public static final List<ReadableMetricColDefinition> READABLE_METRIC_COLS =
+      ImmutableList.of(
+          new ReadableMetricColDefinition(
+              "column_size",
+              "Total size on disk",
+              DataFile.COLUMN_SIZES,
+              field -> Types.LongType.get(),
+              (file, field) ->
+                  file.columnSizes() == null ? null : 
file.columnSizes().get(field.fieldId())),
+          new ReadableMetricColDefinition(
+              "value_count",
+              "Total count, including null and NaN",
+              DataFile.VALUE_COUNTS,
+              field -> Types.LongType.get(),
+              (file, field) ->
+                  file.valueCounts() == null ? null : 
file.valueCounts().get(field.fieldId())),
+          new ReadableMetricColDefinition(
+              "null_value_count",
+              "Null value count",
+              DataFile.NULL_VALUE_COUNTS,
+              field -> Types.LongType.get(),
+              (file, field) ->
+                  file.nullValueCounts() == null
+                      ? null
+                      : file.nullValueCounts().get(field.fieldId())),
+          new ReadableMetricColDefinition(
+              "nan_value_count",
+              "NaN value count",
+              DataFile.NAN_VALUE_COUNTS,
+              field -> Types.LongType.get(),
+              (file, field) ->
+                  file.nanValueCounts() == null
+                      ? null
+                      : file.nanValueCounts().get(field.fieldId())),
+          new ReadableMetricColDefinition(
+              "lower_bound",
+              "Lower bound",
+              DataFile.LOWER_BOUNDS,
+              Types.NestedField::type,
+              (file, field) ->
+                  file.lowerBounds() == null
+                      ? null
+                      : Conversions.fromByteBuffer(
+                          field.type(), 
file.lowerBounds().get(field.fieldId()))),
+          new ReadableMetricColDefinition(
+              "upper_bound",
+              "Upper bound",
+              DataFile.UPPER_BOUNDS,
+              Types.NestedField::type,
+              (file, field) ->
+                  file.upperBounds() == null
+                      ? null
+                      : Conversions.fromByteBuffer(
+                          field.type(), 
file.upperBounds().get(field.fieldId()))));
+
+  public static final String READABLE_METRICS = "readable_metrics";
+
+  /**
+   * Fixed definition of a readable metric column, ie a mapping of a raw 
metric to a readable metric
+   */
+  public static class ReadableMetricColDefinition {
+    private final String name;
+    private final String doc;
+    private final Types.NestedField originalCol;
+    private final TypeFunction typeFunction;
+    private final MetricFunction metricFunction;
+
+    public interface TypeFunction {
+      Type type(Types.NestedField originalCol);
+    }
+
+    public interface MetricFunction {
+      Object metric(ContentFile<?> file, Types.NestedField originalCol);
+    }
+
+    /**
+     * @param name column name
+     * @param doc column doc
+     * @param originalCol original (raw) metric column field on metadata table
+     * @param typeFunction function that returns the readable metric column 
type from original field
+     *     type
+     * @param metricFunction function that returns readable metric from data 
file
+     */
+    ReadableMetricColDefinition(
+        String name,
+        String doc,
+        Types.NestedField originalCol,
+        TypeFunction typeFunction,
+        MetricFunction metricFunction) {
+      this.name = name;
+      this.doc = doc;
+      this.originalCol = originalCol;
+      this.typeFunction = typeFunction;
+      this.metricFunction = metricFunction;
+    }
+
+    Types.NestedField originalCol() {
+      return originalCol;
+    }
+
+    Type colType(Types.NestedField field) {
+      return typeFunction.type(field);
+    }
+
+    String name() {
+      return name;
+    }
+
+    String doc() {
+      return doc;
+    }
+
+    Object value(ContentFile<?> dataFile, Types.NestedField dataField) {
+      return metricFunction.metric(dataFile, dataField);
+    }
+  }
+
+  /** A struct of readable metric values for a primitive column */
+  public static class ReadableColMetricsStruct implements StructLike {
+
+    private final String columnName;
+    private final Map<Integer, Integer> projectionMap;
+    private final Object[] metrics;
+
+    public ReadableColMetricsStruct(
+        String columnName, Types.NestedField projection, Object... metrics) {
+      this.columnName = columnName;
+      this.projectionMap = readableMetricsProjection(projection);
+      this.metrics = metrics;
+    }
+
+    @Override
+    public int size() {
+      return projectionMap.size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      Object value = get(pos);
+      return value == null ? null : javaClass.cast(value);
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("ReadableColMetricsStruct is 
read only");
+    }
+
+    private Object get(int pos) {
+      int projectedPos = projectionMap.get(pos);
+      return metrics[projectedPos];
+    }
+
+    /** Returns map of projected position to actual position of this struct's 
fields */
+    private Map<Integer, Integer> readableMetricsProjection(Types.NestedField 
projection) {
+      Map<Integer, Integer> result = Maps.newHashMap();
+
+      Set<String> projectedFields =
+          Sets.newHashSet(
+              projection.type().asStructType().fields().stream()
+                  .map(Types.NestedField::name)
+                  .collect(Collectors.toSet()));
+
+      int projectedIndex = 0;
+      for (int fieldIndex = 0; fieldIndex < READABLE_METRIC_COLS.size(); 
fieldIndex++) {
+        ReadableMetricColDefinition readableMetric = 
READABLE_METRIC_COLS.get(fieldIndex);
+
+        if (projectedFields.contains(readableMetric.name())) {
+          result.put(projectedIndex, fieldIndex);
+          projectedIndex++;
+        }
+      }
+      return result;
+    }
+
+    String columnName() {
+      return columnName;
+    }
+  }
+
+  /**
+   * A struct, consisting of all {@link ReadableColMetricsStruct} for all 
primitive columns of the
+   * table
+   */
+  public static class ReadableMetricsStruct implements StructLike {
+
+    private final List<StructLike> columnMetrics;
+
+    public ReadableMetricsStruct(List<StructLike> columnMetrics) {
+      this.columnMetrics = columnMetrics;
+    }
+
+    @Override
+    public int size() {
+      return columnMetrics.size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(columnMetrics.get(pos));
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("ReadableMetricsStruct is read 
only");
+    }
+  }
+
+  /**
+   * Calculates a dynamic schema for readable_metrics to add to metadata 
tables. The type will be
+   * the struct {@link ReadableColMetricsStruct}, composed of {@link 
ReadableMetricsStruct} for all
+   * primitive columns in the data table
+   *
+   * @param dataTableSchema schema of data table
+   * @param metadataTableSchema schema of existing metadata table (to ensure 
id uniqueness)
+   * @return schema of readable_metrics struct
+   */
+  public static Schema readableMetricsSchema(Schema dataTableSchema, Schema 
metadataTableSchema) {
+    List<Types.NestedField> fields = Lists.newArrayList();
+    Map<Integer, String> idToName = dataTableSchema.idToName();
+    AtomicInteger nextId = new 
AtomicInteger(metadataTableSchema.highestFieldId());
+
+    for (int id : idToName.keySet()) {
+      Types.NestedField field = dataTableSchema.findField(id);
+
+      if (field.type().isPrimitiveType()) {
+        String colName = idToName.get(id);
+
+        fields.add(
+            Types.NestedField.of(
+                nextId.incrementAndGet(),
+                true,
+                colName,
+                Types.StructType.of(
+                    READABLE_METRIC_COLS.stream()
+                        .map(
+                            m ->
+                                optional(
+                                    nextId.incrementAndGet(), m.name(), 
m.colType(field), m.doc()))
+                        .collect(Collectors.toList())),
+                String.format("Metrics for column %s", colName)));
+      }
+    }
+
+    fields.sort(Comparator.comparing(Types.NestedField::name));
+    return new Schema(
+        optional(
+            nextId.incrementAndGet(),
+            "readable_metrics",
+            Types.StructType.of(fields),
+            "Column metrics in readable form"));
+  }
+
+  /**
+   * Return a readable metrics struct row from file metadata
+   *
+   * @param schema schema of original data table
+   * @param file content file with metrics
+   * @param projectedSchema user requested projection
+   * @return {@link ReadableMetricsStruct}
+   */
+  public static ReadableMetricsStruct readableMetricsStruct(
+      Schema schema, ContentFile<?> file, Types.StructType projectedSchema) {
+    Map<Integer, String> idToName = schema.idToName();
+    List<ReadableColMetricsStruct> colMetrics = Lists.newArrayList();
+
+    for (int id : idToName.keySet()) {
+      String qualifiedName = idToName.get(id);
+      Types.NestedField field = schema.findField(id);
+
+      Object[] metrics =
+          READABLE_METRIC_COLS.stream()
+              .map(readableMetric -> readableMetric.value(file, field))
+              .toArray();
+
+      if (field.type().isPrimitiveType() // Iceberg stores metrics only for 
primitive types
+          && projectedSchema.field(qualifiedName)
+              != null) { // User has requested this column metric
+        colMetrics.add(
+            new ReadableColMetricsStruct(
+                qualifiedName, projectedSchema.field(qualifiedName), metrics));
+      }
+    }
+
+    
colMetrics.sort(Comparator.comparing(ReadableColMetricsStruct::columnName));
+    return new ReadableMetricsStruct(
+        colMetrics.stream().map(m -> (StructLike) 
m).collect(Collectors.toList()));
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index e816b10db8..a940932209 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -527,6 +527,93 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     Assert.assertEquals(expected, scan.schema().asStruct());
   }
 
+  @Test
+  public void testFilesTableReadableMetricsSchema() {
+    Table filesTable = new FilesTable(table.ops(), table);
+    Types.StructType actual = 
filesTable.newScan().schema().select("readable_metrics").asStruct();
+    int highestId = filesTable.schema().highestFieldId();
+
+    Types.StructType expected =
+        Types.StructType.of(
+            optional(
+                highestId,
+                "readable_metrics",
+                Types.StructType.of(
+                    Types.NestedField.optional(
+                        highestId - 14,
+                        "data",
+                        Types.StructType.of(
+                            Types.NestedField.optional(
+                                highestId - 13,
+                                "column_size",
+                                Types.LongType.get(),
+                                "Total size on disk"),
+                            Types.NestedField.optional(
+                                highestId - 12,
+                                "value_count",
+                                Types.LongType.get(),
+                                "Total count, including null and NaN"),
+                            Types.NestedField.optional(
+                                highestId - 11,
+                                "null_value_count",
+                                Types.LongType.get(),
+                                "Null value count"),
+                            Types.NestedField.optional(
+                                highestId - 10,
+                                "nan_value_count",
+                                Types.LongType.get(),
+                                "NaN value count"),
+                            Types.NestedField.optional(
+                                highestId - 9,
+                                "lower_bound",
+                                Types.StringType.get(),
+                                "Lower bound"),
+                            Types.NestedField.optional(
+                                highestId - 8,
+                                "upper_bound",
+                                Types.StringType.get(),
+                                "Upper bound")),
+                        "Metrics for column data"),
+                    Types.NestedField.optional(
+                        highestId - 7,
+                        "id",
+                        Types.StructType.of(
+                            Types.NestedField.optional(
+                                highestId - 6,
+                                "column_size",
+                                Types.LongType.get(),
+                                "Total size on disk"),
+                            Types.NestedField.optional(
+                                highestId - 5,
+                                "value_count",
+                                Types.LongType.get(),
+                                "Total count, including null and NaN"),
+                            Types.NestedField.optional(
+                                highestId - 4,
+                                "null_value_count",
+                                Types.LongType.get(),
+                                "Null value count"),
+                            Types.NestedField.optional(
+                                highestId - 3,
+                                "nan_value_count",
+                                Types.LongType.get(),
+                                "NaN value count"),
+                            Types.NestedField.optional(
+                                highestId - 2,
+                                "lower_bound",
+                                Types.IntegerType.get(),
+                                "Lower bound"),
+                            Types.NestedField.optional(
+                                highestId - 1,
+                                "upper_bound",
+                                Types.IntegerType.get(),
+                                "Upper bound")),
+                        "Metrics for column id")),
+                "Column metrics in readable form"));
+
+    Assert.assertEquals("Dynamic schema for readable_metrics should match", 
actual, expected);
+  }
+
   @Test
   public void testPartitionSpecEvolutionAdditive() {
     preparePartitionedTable();
diff --git 
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 675a330c84..07d77dc99b 100644
--- 
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -55,6 +55,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -68,12 +71,14 @@ import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -420,7 +425,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -443,8 +447,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -463,7 +468,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(0), actual.get(0));
   }
 
   @Test
@@ -474,7 +479,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
"true").commit();
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     spark.sql(
         String.format(
@@ -488,6 +492,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
     inputDF.select("data", 
"id").write().mode("overwrite").insertInto("parquet_table");
 
+    NameMapping mapping = MappingUtil.create(table.schema());
+    String mappingJson = NameMappingParser.toJson(mapping);
+
+    table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, 
mappingJson).commit();
+
     try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(
@@ -496,12 +505,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
           table,
           stagingLocation);
 
-      List<Row> actual =
-          spark
-              .read()
-              .format("iceberg")
-              .load(loadLocation(tableIdentifier, "files"))
-              .collectAsList();
+      Dataset<Row> filesTableDs =
+          spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+      List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
       for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -516,10 +522,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         }
       }
 
+      Types.StructType struct = nonDerivedSchema(filesTableDs);
       Assert.assertEquals("Files table should have one row", 2, 
expected.size());
       Assert.assertEquals("Actual results should have one row", 2, 
actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(1), actual.get(1));
+      TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
     } finally {
       spark.sql("DROP TABLE parquet_table");
     }
@@ -582,7 +589,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_files_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -609,8 +615,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFile(toDelete).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -629,7 +636,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(0), actual.get(0));
   }
 
   @Test
@@ -692,7 +699,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "all_data_files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -718,20 +724,16 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // ensure table data isn't stale
     table.refresh();
 
-    List<Row> actual =
-        spark
-            .read()
-            .format("iceberg")
-            .load(loadLocation(tableIdentifier, "all_data_files"))
-            .orderBy("file_path")
-            .collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"all_data_files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest :
+    Iterable<ManifestFile> dataManifests =
         Iterables.concat(
-            Iterables.transform(
-                table.snapshots(), snapshot -> 
snapshot.dataManifests(table.io())))) {
+            Iterables.transform(table.snapshots(), snapshot -> 
snapshot.dataManifests(table.io())));
+    for (ManifestFile manifest : dataManifests) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows =
           Avro.read(in).project(entriesTable.schema()).build()) {
@@ -750,7 +752,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Assert.assertEquals("Files table should have two rows", 2, 
expected.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(i), actual.get(i));
+      TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(i), actual.get(i));
     }
   }
 
@@ -1751,7 +1753,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .build();
   }
 
-  private void asMetadataRecord(GenericData.Record file) {
+  public static void asMetadataRecord(GenericData.Record file) {
     file.put(0, FileContent.DATA.id());
     file.put(3, 0); // specId
   }
@@ -1783,4 +1785,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
 
     return positionDeleteWriter.toDeleteFile();
   }
+
+  public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+    StructField[] fields = metadataTable.schema().fields();
+    return metadataTable.select(
+        Stream.of(fields)
+            .filter(f -> !f.name().equals("readable_metrics")) // derived field
+            .map(f -> new Column(f.name()))
+            .toArray(Column[]::new));
+  }
+
+  public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+    return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
+  }
 }
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 1976d71b6c..c51470f19c 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -55,6 +55,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -68,12 +71,14 @@ import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -418,7 +423,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -441,8 +445,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -461,7 +466,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(0), actual.get(0));
   }
 
   @Test
@@ -472,7 +477,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
"true").commit();
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     spark.sql(
         String.format(
@@ -486,6 +490,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
     inputDF.select("data", 
"id").write().mode("overwrite").insertInto("parquet_table");
 
+    NameMapping mapping = MappingUtil.create(table.schema());
+    String mappingJson = NameMappingParser.toJson(mapping);
+
+    table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, 
mappingJson).commit();
+
     try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(
@@ -494,12 +503,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
           table,
           stagingLocation);
 
-      List<Row> actual =
-          spark
-              .read()
-              .format("iceberg")
-              .load(loadLocation(tableIdentifier, "files"))
-              .collectAsList();
+      Dataset<Row> filesTableDs =
+          spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+      List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
       for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -514,10 +520,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         }
       }
 
+      Types.StructType struct = nonDerivedSchema(filesTableDs);
       Assert.assertEquals("Files table should have one row", 2, 
expected.size());
       Assert.assertEquals("Actual results should have one row", 2, 
actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(1), actual.get(1));
+      TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
     } finally {
       spark.sql("DROP TABLE parquet_table");
     }
@@ -580,7 +587,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_files_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -607,8 +613,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFile(toDelete).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -627,7 +634,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(0), actual.get(0));
   }
 
   @Test
@@ -690,7 +697,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "all_data_files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -716,13 +722,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // ensure table data isn't stale
     table.refresh();
 
-    List<Row> actual =
-        spark
-            .read()
-            .format("iceberg")
-            .load(loadLocation(tableIdentifier, "all_data_files"))
-            .orderBy("file_path")
-            .collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"all_data_files"));
+    List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
@@ -748,7 +750,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Assert.assertEquals("Files table should have two rows", 2, 
expected.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(i), actual.get(i));
+      TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs), 
expected.get(i), actual.get(i));
     }
   }
 
@@ -1789,11 +1791,6 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         .build();
   }
 
-  private void asMetadataRecord(GenericData.Record file) {
-    file.put(0, FileContent.DATA.id());
-    file.put(3, 0); // specId
-  }
-
   private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
       Table table, PartitionSpec spec, StructLike partition) {
     OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 
0).build();
@@ -1821,4 +1818,22 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
 
     return positionDeleteWriter.toDeleteFile();
   }
+
+  private static void asMetadataRecord(GenericData.Record file) {
+    file.put(0, FileContent.DATA.id());
+    file.put(3, 0); // specId
+  }
+
+  public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+    StructField[] fields = metadataTable.schema().fields();
+    return metadataTable.select(
+        Stream.of(fields)
+            .filter(f -> !f.name().equals("readable_metrics")) // derived field
+            .map(f -> new Column(f.name()))
+            .toArray(Column[]::new));
+  }
+
+  public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+    return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
+  }
 }
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 64a10bfdcd..a60e0f5d93 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -90,8 +92,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".files").schema();
 
     // check delete files table
-    List<Row> actualDeleteFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".delete_files").collectAsList();
+    Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName 
+ ".delete_files");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
 
@@ -100,22 +102,28 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
     Assert.assertEquals("Should be one delete file manifest entry", 1, 
expectedDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // check data files table
-    List<Row> actualDataFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".data_files").collectAsList();
+    Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + 
".data_files");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // check all files table
-    List<Row> actualFiles =
-        spark.sql("SELECT * FROM " + tableName + ".files ORDER BY 
content").collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
 
     List<Record> expectedFiles =
@@ -123,9 +131,9 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             .collect(Collectors.toList());
     Assert.assertEquals("Should have two files manifest entries", 2, 
expectedFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
   @Test
@@ -175,27 +183,31 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals(
         "Should have one delete file manifest entry", 1, 
expectedDeleteFiles.size());
 
-    List<Row> actualDeleteFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
+
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check data files table
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
     Assert.assertEquals("Should have one data file manifest entry", 1, 
expectedDataFiles.size());
 
-    List<Row> actualDataFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".data_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDataFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE 
partition.data='a'");
+
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     List<Row> actualPartitionsWithProjection =
         spark.sql("SELECT file_count FROM " + tableName + ".partitions 
").collectAsList();
@@ -213,19 +225,15 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             .collect(Collectors.toList());
     Assert.assertEquals("Should have two file manifest entries", 2, 
expectedFiles.size());
 
-    List<Row> actualFiles =
-        spark
-            .sql(
-                "SELECT * FROM "
-                    + tableName
-                    + ".files "
-                    + "WHERE partition.data='a' ORDER BY content")
-            .collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql(
+            "SELECT * FROM " + tableName + ".files " + "WHERE 
partition.data='a' ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
   @Test
@@ -265,19 +273,22 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         Spark3Util.loadIcebergTable(spark, tableName + 
".all_data_files").schema();
 
     // Check all data files table
-    List<Row> actualDataFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".all_data_files").collectAsList();
+    Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + 
".all_data_files");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // Check all delete files table
-    List<Row> actualDeleteFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".all_delete_files").collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_delete_files");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
@@ -285,15 +296,19 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check all files table
-    List<Row> actualFiles =
-        spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY 
content").collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY 
content");
+    List<Row> actualFiles = actualFilesDs.collectAsList();
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
-    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, 
actualFiles);
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles, 
actualFiles);
   }
 
   @Test
@@ -341,22 +356,24 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         Spark3Util.loadIcebergTable(spark, tableName + 
".all_data_files").schema();
 
     // Check all data files table
-    List<Row> actualDataFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDataFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
+            .asStruct(),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // Check all delete files table
-    List<Row> actualDeleteFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
+
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
@@ -364,21 +381,24 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDeleteFiles.size());
 
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check all files table
-    List<Row> actualFiles =
-        spark
-            .sql(
-                "SELECT * FROM "
-                    + tableName
-                    + ".all_files WHERE partition.data='a' "
-                    + "ORDER BY content")
-            .collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql(
+            "SELECT * FROM "
+                + tableName
+                + ".all_files WHERE partition.data='a' "
+                + "ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
-    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, 
actualFiles);
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles, 
actualFiles);
   }
 
   @Test
@@ -615,7 +635,7 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             Record file = (Record) record.get("data_file");
             if (partitionMatch(file, partValue)) {
-              asMetadataRecord(file, expectedContent);
+              TestHelpers.asMetadataRecord(file, expectedContent);
               expected.add(file);
             }
           }
@@ -625,12 +645,6 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     return expected;
   }
 
-  // Populate certain fields derived in the metadata tables
-  private void asMetadataRecord(Record file, FileContent content) {
-    file.put(0, content.id());
-    file.put(3, 0); // specId
-  }
-
   private boolean partitionMatch(Record file, String partValue) {
     if (partValue == null) {
       return true;
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index e5ad0ca213..59074bbd92 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -38,12 +38,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
@@ -51,10 +53,13 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -817,4 +822,27 @@ public class TestHelpers {
         .map(ManifestFile::path)
         .collect(Collectors.toSet());
   }
+
+  public static void asMetadataRecord(GenericData.Record file, FileContent 
content) {
+    file.put(0, content.id());
+    file.put(3, 0); // specId
+  }
+
+  public static void asMetadataRecord(GenericData.Record file) {
+    file.put(0, FileContent.DATA.id());
+    file.put(3, 0); // specId
+  }
+
+  public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+    StructField[] fields = metadataTable.schema().fields();
+    return metadataTable.select(
+        Stream.of(fields)
+            .filter(f -> !f.name().equals("readable_metrics")) // derived field
+            .map(f -> new Column(f.name()))
+            .toArray(Column[]::new));
+  }
+
+  public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+    return 
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+  }
 }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 57ce654a71..361011dd1c 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -55,6 +54,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -173,7 +175,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             row.put(2, 0L); // data sequence number
             row.put(3, 0L); // file sequence number
             GenericData.Record file = (GenericData.Record) 
row.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(row);
           });
     }
@@ -367,7 +369,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
               row.put(2, 0L); // data sequence number
               row.put(3, 0L); // file sequence number
               GenericData.Record file = (GenericData.Record) 
row.get("data_file");
-              asMetadataRecord(file);
+              TestHelpers.asMetadataRecord(file);
               expected.add(row);
             });
       }
@@ -418,7 +420,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -441,8 +442,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -452,7 +454,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -461,7 +463,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), 
actual.get(0));
   }
 
   @Test
@@ -472,7 +475,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
"true").commit();
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     spark.sql(
         String.format(
@@ -486,6 +488,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
     inputDF.select("data", 
"id").write().mode("overwrite").insertInto("parquet_table");
 
+    NameMapping mapping = MappingUtil.create(table.schema());
+    String mappingJson = NameMappingParser.toJson(mapping);
+
+    table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, 
mappingJson).commit();
+
     try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(
@@ -494,12 +501,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
           table,
           stagingLocation);
 
-      List<Row> actual =
-          spark
-              .read()
-              .format("iceberg")
-              .load(loadLocation(tableIdentifier, "files"))
-              .collectAsList();
+      Dataset<Row> filesTableDs =
+          spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+      List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
       for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -508,16 +512,17 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             Avro.read(in).project(entriesTable.schema()).build()) {
           for (GenericData.Record record : rows) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
       }
 
+      Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
       Assert.assertEquals("Files table should have one row", 2, 
expected.size());
       Assert.assertEquals("Actual results should have one row", 2, 
actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(1), actual.get(1));
+      TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
     } finally {
       spark.sql("DROP TABLE parquet_table");
     }
@@ -580,7 +585,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_files_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -607,8 +611,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFile(toDelete).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -618,7 +623,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -627,7 +632,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), 
actual.get(0));
   }
 
   @Test
@@ -690,7 +696,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "all_data_files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -716,13 +721,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // ensure table data isn't stale
     table.refresh();
 
-    List<Row> actual =
-        spark
-            .read()
-            .format("iceberg")
-            .load(loadLocation(tableIdentifier, "all_data_files"))
-            .orderBy("file_path")
-            .collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"all_data_files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
@@ -736,7 +737,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -748,7 +749,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Assert.assertEquals("Files table should have two rows", 2, 
expected.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(i), actual.get(i));
+      TestHelpers.assertEqualsSafe(
+          TestHelpers.nonDerivedSchema(filesTableDs), expected.get(i), 
actual.get(i));
     }
   }
 
@@ -1806,11 +1808,6 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         .build();
   }
 
-  private void asMetadataRecord(GenericData.Record file) {
-    file.put(0, FileContent.DATA.id());
-    file.put(3, 0); // specId
-  }
-
   private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
       Table table, PartitionSpec spec, StructLike partition) {
     OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 
0).build();
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 64a10bfdcd..a60e0f5d93 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -90,8 +92,8 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".files").schema();
 
     // check delete files table
-    List<Row> actualDeleteFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".delete_files").collectAsList();
+    Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName 
+ ".delete_files");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
 
@@ -100,22 +102,28 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
     Assert.assertEquals("Should be one delete file manifest entry", 1, 
expectedDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // check data files table
-    List<Row> actualDataFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".data_files").collectAsList();
+    Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + 
".data_files");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // check all files table
-    List<Row> actualFiles =
-        spark.sql("SELECT * FROM " + tableName + ".files ORDER BY 
content").collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
 
     List<Record> expectedFiles =
@@ -123,9 +131,9 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             .collect(Collectors.toList());
     Assert.assertEquals("Should have two files manifest entries", 2, 
expectedFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
   @Test
@@ -175,27 +183,31 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals(
         "Should have one delete file manifest entry", 1, 
expectedDeleteFiles.size());
 
-    List<Row> actualDeleteFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
+
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check data files table
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
     Assert.assertEquals("Should have one data file manifest entry", 1, 
expectedDataFiles.size());
 
-    List<Row> actualDataFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".data_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDataFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE 
partition.data='a'");
+
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     List<Row> actualPartitionsWithProjection =
         spark.sql("SELECT file_count FROM " + tableName + ".partitions 
").collectAsList();
@@ -213,19 +225,15 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
             .collect(Collectors.toList());
     Assert.assertEquals("Should have two file manifest entries", 2, 
expectedFiles.size());
 
-    List<Row> actualFiles =
-        spark
-            .sql(
-                "SELECT * FROM "
-                    + tableName
-                    + ".files "
-                    + "WHERE partition.data='a' ORDER BY content")
-            .collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql(
+            "SELECT * FROM " + tableName + ".files " + "WHERE 
partition.data='a' ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
   }
 
   @Test
@@ -265,19 +273,22 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         Spark3Util.loadIcebergTable(spark, tableName + 
".all_data_files").schema();
 
     // Check all data files table
-    List<Row> actualDataFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".all_data_files").collectAsList();
+    Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + 
".all_data_files");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
 
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDataFilesDs),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // Check all delete files table
-    List<Row> actualDeleteFiles =
-        spark.sql("SELECT * FROM " + tableName + 
".all_delete_files").collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_delete_files");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
@@ -285,15 +296,19 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals(
         "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check all files table
-    List<Row> actualFiles =
-        spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY 
content").collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY 
content");
+    List<Row> actualFiles = actualFilesDs.collectAsList();
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
-    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, 
actualFiles);
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles, 
actualFiles);
   }
 
   @Test
@@ -341,22 +356,24 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         Spark3Util.loadIcebergTable(spark, tableName + 
".all_data_files").schema();
 
     // Check all data files table
-    List<Row> actualDataFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDataFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDataFiles = 
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
     List<Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
     Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDataFiles.get(0), 
actualDataFiles.get(0));
+        
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
+            .asStruct(),
+        expectedDataFiles.get(0),
+        actualDataFiles.get(0));
 
     // Check all delete files table
-    List<Row> actualDeleteFiles =
-        spark
-            .sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE 
partition.data='a'")
-            .collectAsList();
+    Dataset<Row> actualDeleteFilesDs =
+        spark.sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE 
partition.data='a'");
+    List<Row> actualDeleteFiles = 
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
+
     List<Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.POSITION_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
@@ -364,21 +381,24 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     Assert.assertEquals("Metadata table should return one data file", 1, 
actualDeleteFiles.size());
 
     TestHelpers.assertEqualsSafe(
-        filesTableSchema.asStruct(), expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
+        TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+        expectedDeleteFiles.get(0),
+        actualDeleteFiles.get(0));
 
     // Check all files table
-    List<Row> actualFiles =
-        spark
-            .sql(
-                "SELECT * FROM "
-                    + tableName
-                    + ".all_files WHERE partition.data='a' "
-                    + "ORDER BY content")
-            .collectAsList();
+    Dataset<Row> actualFilesDs =
+        spark.sql(
+            "SELECT * FROM "
+                + tableName
+                + ".all_files WHERE partition.data='a' "
+                + "ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
     List<Record> expectedFiles = ListUtils.union(expectedDataFiles, 
expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
     Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
-    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, 
actualFiles);
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles, 
actualFiles);
   }
 
   @Test
@@ -615,7 +635,7 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             Record file = (Record) record.get("data_file");
             if (partitionMatch(file, partValue)) {
-              asMetadataRecord(file, expectedContent);
+              TestHelpers.asMetadataRecord(file, expectedContent);
               expected.add(file);
             }
           }
@@ -625,12 +645,6 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
     return expected;
   }
 
-  // Populate certain fields derived in the metadata tables
-  private void asMetadataRecord(Record file, FileContent content) {
-    file.put(0, content.id());
-    file.put(3, 0); // specId
-  }
-
   private boolean partitionMatch(Record file, String partValue) {
     if (partValue == null) {
       return true;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index e5ad0ca213..59074bbd92 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -38,12 +38,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
@@ -51,10 +53,13 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -817,4 +822,27 @@ public class TestHelpers {
         .map(ManifestFile::path)
         .collect(Collectors.toSet());
   }
+
+  public static void asMetadataRecord(GenericData.Record file, FileContent 
content) {
+    file.put(0, content.id());
+    file.put(3, 0); // specId
+  }
+
+  public static void asMetadataRecord(GenericData.Record file) {
+    file.put(0, FileContent.DATA.id());
+    file.put(3, 0); // specId
+  }
+
+  public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+    StructField[] fields = metadataTable.schema().fields();
+    return metadataTable.select(
+        Stream.of(fields)
+            .filter(f -> !f.name().equals("readable_metrics")) // derived field
+            .map(f -> new Column(f.name()))
+            .toArray(Column[]::new));
+  }
+
+  public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+    return 
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+  }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 57ce654a71..86bb980953 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -55,6 +54,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -173,7 +175,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             row.put(2, 0L); // data sequence number
             row.put(3, 0L); // file sequence number
             GenericData.Record file = (GenericData.Record) 
row.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(row);
           });
     }
@@ -367,7 +369,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
               row.put(2, 0L); // data sequence number
               row.put(3, 0L); // file sequence number
               GenericData.Record file = (GenericData.Record) 
row.get("data_file");
-              asMetadataRecord(file);
+              TestHelpers.asMetadataRecord(file);
               expected.add(row);
             });
       }
@@ -418,7 +420,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -441,8 +442,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -452,7 +454,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -461,7 +463,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), 
actual.get(0));
   }
 
   @Test
@@ -472,7 +476,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
"true").commit();
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     spark.sql(
         String.format(
@@ -486,6 +489,11 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
     inputDF.select("data", 
"id").write().mode("overwrite").insertInto("parquet_table");
 
+    NameMapping mapping = MappingUtil.create(table.schema());
+    String mappingJson = NameMappingParser.toJson(mapping);
+
+    table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, 
mappingJson).commit();
+
     try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(
@@ -494,12 +502,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
           table,
           stagingLocation);
 
-      List<Row> actual =
-          spark
-              .read()
-              .format("iceberg")
-              .load(loadLocation(tableIdentifier, "files"))
-              .collectAsList();
+      Dataset<Row> filesTableDs =
+          spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+      List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
       for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -508,16 +513,17 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             Avro.read(in).project(entriesTable.schema()).build()) {
           for (GenericData.Record record : rows) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
       }
 
+      Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
       Assert.assertEquals("Files table should have one row", 2, 
expected.size());
       Assert.assertEquals("Actual results should have one row", 2, 
actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(1), actual.get(1));
+      TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
     } finally {
       spark.sql("DROP TABLE parquet_table");
     }
@@ -580,7 +586,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_files_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -607,8 +612,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFile(toDelete).commit();
 
-    List<Row> actual =
-        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files")).collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : 
table.currentSnapshot().dataManifests(table.io())) {
@@ -618,7 +624,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -627,7 +633,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Files table should have one row", 1, expected.size());
     Assert.assertEquals("Actual results should have one row", 1, 
actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), 
actual.get(0));
   }
 
   @Test
@@ -690,7 +697,6 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table entriesTable = loadTable(tableIdentifier, "entries");
-    Table filesTable = loadTable(tableIdentifier, "all_data_files");
 
     Dataset<Row> df1 =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -716,13 +722,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // ensure table data isn't stale
     table.refresh();
 
-    List<Row> actual =
-        spark
-            .read()
-            .format("iceberg")
-            .load(loadLocation(tableIdentifier, "all_data_files"))
-            .orderBy("file_path")
-            .collectAsList();
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"all_data_files"));
+    List<Row> actual = 
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
@@ -736,7 +738,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
             GenericData.Record file = (GenericData.Record) 
record.get("data_file");
-            asMetadataRecord(file);
+            TestHelpers.asMetadataRecord(file);
             expected.add(file);
           }
         }
@@ -748,7 +750,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     Assert.assertEquals("Files table should have two rows", 2, 
expected.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), 
expected.get(i), actual.get(i));
+      TestHelpers.assertEqualsSafe(
+          TestHelpers.nonDerivedSchema(filesTableDs), expected.get(i), 
actual.get(i));
     }
   }
 
@@ -1806,11 +1809,6 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         .build();
   }
 
-  private void asMetadataRecord(GenericData.Record file) {
-    file.put(0, FileContent.DATA.id());
-    file.put(3, 0); // specId
-  }
-
   private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
       Table table, PartitionSpec spec, StructLike partition) {
     OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 
0).build();
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
new file mode 100644
index 0000000000..343943b0f8
--- /dev/null
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestMetadataTableReadableMetrics extends SparkTestBaseWithCatalog 
{
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final Types.StructType LEAF_STRUCT_TYPE =
+      Types.StructType.of(
+          optional(1, "leafLongCol", Types.LongType.get()),
+          optional(2, "leafDoubleCol", Types.DoubleType.get()));
+
+  private static final Types.StructType NESTED_STRUCT_TYPE =
+      Types.StructType.of(required(3, "leafStructCol", LEAF_STRUCT_TYPE));
+
+  private static final Schema NESTED_SCHEMA =
+      new Schema(required(4, "nestedStructCol", NESTED_STRUCT_TYPE));
+
+  private static final Schema PRIMITIVE_SCHEMA =
+      new Schema(
+          required(1, "booleanCol", Types.BooleanType.get()),
+          required(2, "intCol", Types.IntegerType.get()),
+          required(3, "longCol", Types.LongType.get()),
+          required(4, "floatCol", Types.FloatType.get()),
+          required(5, "doubleCol", Types.DoubleType.get()),
+          optional(6, "decimalCol", Types.DecimalType.of(10, 2)),
+          optional(7, "stringCol", Types.StringType.get()),
+          optional(8, "fixedCol", Types.FixedType.ofLength(3)),
+          optional(9, "binaryCol", Types.BinaryType.get()));
+
+  public TestMetadataTableReadableMetrics() {
+    // only SparkCatalog supports metadata table sql queries
+    super(SparkCatalogConfig.HIVE);
+  }
+
+  protected String tableName() {
+    return tableName.split("\\.")[2];
+  }
+
+  protected String database() {
+    return tableName.split("\\.")[1];
+  }
+
+  private Table createPrimitiveTable() throws IOException {
+    Table table =
+        catalog.createTable(
+            TableIdentifier.of(Namespace.of(database()), tableName()),
+            PRIMITIVE_SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of());
+    List<Record> records =
+        Lists.newArrayList(
+            createPrimitiveRecord(
+                false,
+                1,
+                1L,
+                0,
+                1.0D,
+                new BigDecimal("1.00"),
+                "1",
+                Base64.getDecoder().decode("1111"),
+                ByteBuffer.wrap(Base64.getDecoder().decode("1111"))),
+            createPrimitiveRecord(
+                true,
+                2,
+                2L,
+                0,
+                2.0D,
+                new BigDecimal("2.00"),
+                "2",
+                Base64.getDecoder().decode("2222"),
+                ByteBuffer.wrap(Base64.getDecoder().decode("2222"))),
+            createPrimitiveRecord(false, 1, 1, Float.NaN, Double.NaN, null, 
"1", null, null),
+            createPrimitiveRecord(
+                false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2", 
null, null));
+
+    DataFile dataFile =
+        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+    table.newAppend().appendFile(dataFile).commit();
+    return table;
+  }
+
+  private void createNestedTable() throws IOException {
+    Table table =
+        catalog.createTable(
+            TableIdentifier.of(Namespace.of(database()), tableName()),
+            NESTED_SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of());
+
+    List<Record> records =
+        Lists.newArrayList(
+            createNestedRecord(0L, 0.0),
+            createNestedRecord(1L, Double.NaN),
+            createNestedRecord(null, null));
+    DataFile dataFile =
+        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+    table.newAppend().appendFile(dataFile).commit();
+  }
+
+  @After
+  public void dropTable() {
+    sql("DROP TABLE %s", tableName);
+  }
+
+  private Dataset<Row> filesDf() {
+    return spark.read().format("iceberg").load(database() + "." + tableName() 
+ ".files");
+  }
+
+  protected GenericRecord createPrimitiveRecord(
+      boolean booleanCol,
+      int intCol,
+      long longCol,
+      float floatCol,
+      double doubleCol,
+      BigDecimal decimalCol,
+      String stringCol,
+      byte[] fixedCol,
+      ByteBuffer binaryCol) {
+    GenericRecord record = GenericRecord.create(PRIMITIVE_SCHEMA);
+    record.set(0, booleanCol);
+    record.set(1, intCol);
+    record.set(2, longCol);
+    record.set(3, floatCol);
+    record.set(4, doubleCol);
+    record.set(5, decimalCol);
+    record.set(6, stringCol);
+    record.set(7, fixedCol);
+    record.set(8, binaryCol);
+    return record;
+  }
+
+  private GenericRecord createNestedRecord(Long longCol, Double doubleCol) {
+    GenericRecord record = GenericRecord.create(NESTED_SCHEMA);
+    GenericRecord nested = GenericRecord.create(NESTED_STRUCT_TYPE);
+    GenericRecord leaf = GenericRecord.create(LEAF_STRUCT_TYPE);
+    leaf.set(0, longCol);
+    leaf.set(1, doubleCol);
+    nested.set(0, leaf);
+    record.set(0, nested);
+    return record;
+  }
+
+  @Test
+  public void testPrimitiveColumns() throws Exception {
+    createPrimitiveTable();
+
+    Object[] binaryCol =
+        row(
+            59L,
+            4L,
+            2L,
+            null,
+            Base64.getDecoder().decode("1111"),
+            Base64.getDecoder().decode("2222"));
+    Object[] booleanCol = row(44L, 4L, 0L, null, false, true);
+    Object[] decimalCol = row(97L, 4L, 1L, null, new BigDecimal("1.00"), new 
BigDecimal("2.00"));
+    Object[] doubleCol = row(99L, 4L, 0L, 1L, 1.0D, 2.0D);
+    Object[] fixedCol =
+        row(
+            55L,
+            4L,
+            2L,
+            null,
+            Base64.getDecoder().decode("1111"),
+            Base64.getDecoder().decode("2222"));
+    Object[] floatCol = row(90L, 4L, 0L, 2L, 0f, 0f);
+    Object[] intCol = row(91L, 4L, 0L, null, 1, 2);
+    Object[] longCol = row(91L, 4L, 0L, null, 1L, 2L);
+    Object[] stringCol = row(99L, 4L, 0L, null, "1", "2");
+
+    Object[] metrics =
+        row(
+            binaryCol,
+            booleanCol,
+            decimalCol,
+            doubleCol,
+            fixedCol,
+            floatCol,
+            intCol,
+            longCol,
+            stringCol);
+
+    assertEquals(
+        "Row should match",
+        ImmutableList.of(new Object[] {metrics}),
+        sql("SELECT readable_metrics FROM %s.files", tableName));
+  }
+
+  @Test
+  public void testSelectPrimitiveValues() throws Exception {
+    createPrimitiveTable();
+
+    assertEquals(
+        "select of primitive readable_metrics fields should work",
+        ImmutableList.of(row(1, true)),
+        sql(
+            "SELECT readable_metrics.intCol.lower_bound, 
readable_metrics.booleanCol.upper_bound FROM %s.files",
+            tableName));
+
+    assertEquals(
+        "mixed select of readable_metrics and other field should work",
+        ImmutableList.of(row(0, 4L)),
+        sql("SELECT content, readable_metrics.longCol.value_count FROM 
%s.files", tableName));
+
+    assertEquals(
+        "mixed select of readable_metrics and other field should work, in the 
other order",
+        ImmutableList.of(row(4L, 0)),
+        sql("SELECT readable_metrics.longCol.value_count, content FROM 
%s.files", tableName));
+  }
+
+  @Test
+  public void testSelectNestedValues() throws Exception {
+    createNestedTable();
+
+    assertEquals(
+        "select of nested readable_metrics fields should work",
+        ImmutableList.of(row(0L, 3L)),
+        sql(
+            "SELECT 
readable_metrics.`nestedStructCol.leafStructCol.leafLongCol`.lower_bound, "
+                + 
"readable_metrics.`nestedStructCol.leafStructCol.leafDoubleCol`.value_count 
FROM %s.files",
+            tableName));
+  }
+
+  @Test
+  public void testNestedValues() throws Exception {
+    createNestedTable();
+
+    Object[] leafDoubleCol = row(53L, 3L, 1L, 1L, 0.0D, 0.0D);
+    Object[] leafLongCol = row(54L, 3L, 1L, null, 0L, 1L);
+    Object[] metrics = row(leafDoubleCol, leafLongCol);
+
+    assertEquals(
+        "Row should match",
+        ImmutableList.of(new Object[] {metrics}),
+        sql("SELECT readable_metrics FROM %s.files", tableName));
+  }
+}

Reply via email to