This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9a00f7477d Core: Add readable_metrics columns to files metadata tables
(#5376)
9a00f7477d is described below
commit 9a00f7477dedac4501fb2de9e1e6d7aa83dc20b7
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Dec 5 10:08:50 2022 -0800
Core: Add readable_metrics columns to files metadata tables (#5376)
---
api/src/main/java/org/apache/iceberg/Schema.java | 9 +
.../java/org/apache/iceberg/types/TypeUtil.java | 18 +-
.../java/org/apache/iceberg/BaseFilesTable.java | 93 ++++++-
.../main/java/org/apache/iceberg/MetricsUtil.java | 300 +++++++++++++++++++++
.../org/apache/iceberg/TestMetadataTableScans.java | 87 ++++++
.../spark/source/TestIcebergSourceTablesBase.java | 75 +++---
.../spark/source/TestIcebergSourceTablesBase.java | 77 +++---
.../spark/extensions/TestMetadataTables.java | 144 +++++-----
.../org/apache/iceberg/spark/data/TestHelpers.java | 28 ++
.../spark/source/TestIcebergSourceTablesBase.java | 73 +++--
.../spark/extensions/TestMetadataTables.java | 144 +++++-----
.../org/apache/iceberg/spark/data/TestHelpers.java | 28 ++
.../spark/source/TestIcebergSourceTablesBase.java | 74 +++--
.../source/TestMetadataTableReadableMetrics.java | 286 ++++++++++++++++++++
14 files changed, 1156 insertions(+), 280 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java
b/api/src/main/java/org/apache/iceberg/Schema.java
index 34105a00ad..5e024b7c1c 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -233,6 +233,15 @@ public class Schema implements Serializable {
return aliasToId;
}
+ /**
+ * Returns a map for this schema between field id and qualified field names.
+ *
+ * @return a map of field id to qualified field names
+ */
+ public Map<Integer, String> idToName() {
+ return lazyIdToName();
+ }
+
/**
* Returns the underlying {@link StructType struct type} for this schema.
*
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index d2b25fb598..4531f1b8ba 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -142,9 +142,21 @@ public class TypeUtil {
}
public static Schema join(Schema left, Schema right) {
- List<Types.NestedField> joinedColumns = Lists.newArrayList();
- joinedColumns.addAll(left.columns());
- joinedColumns.addAll(right.columns());
+ List<Types.NestedField> joinedColumns = Lists.newArrayList(left.columns());
+ for (Types.NestedField rightColumn : right.columns()) {
+ Types.NestedField leftColumn = left.findField(rightColumn.fieldId());
+
+ if (leftColumn == null) {
+ joinedColumns.add(rightColumn);
+ } else {
+ Preconditions.checkArgument(
+ leftColumn.equals(rightColumn),
+ "Schemas have different columns with same id: %s, %s",
+ leftColumn,
+ rightColumn);
+ }
+ }
+
return new Schema(joinedColumns);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 4925506fe9..119d4743f1 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -21,6 +21,8 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -32,6 +34,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
/** Base class logic for files metadata tables */
@@ -48,10 +51,10 @@ abstract class BaseFilesTable extends BaseMetadataTable {
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported.
instead, drop the partition
// field
- return TypeUtil.selectNot(schema,
Sets.newHashSet(DataFile.PARTITION_ID));
- } else {
- return schema;
+ schema = TypeUtil.selectNot(schema,
Sets.newHashSet(DataFile.PARTITION_ID));
}
+
+ return TypeUtil.join(schema,
MetricsUtil.readableMetricsSchema(table().schema(), schema));
}
private static CloseableIterable<FileScanTask> planFiles(
@@ -140,15 +143,17 @@ abstract class BaseFilesTable extends BaseMetadataTable {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projection;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
@@ -156,26 +161,54 @@ abstract class BaseFilesTable extends BaseMetadataTable {
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projection = projection;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Types.NestedField readableMetricsField =
projection.findField(MetricsUtil.READABLE_METRICS);
+
+ if (readableMetricsField == null) {
+ return CloseableIterable.transform(files(projection), file ->
(StructLike) file);
+ } else {
+ // Remove virtual columns from the file projection and ensure that the
underlying metrics
+ // used to create those columns are part of the file projection
+ Set<Integer> readableMetricsIds =
TypeUtil.getProjectedIds(readableMetricsField.type());
+ Schema fileProjection = TypeUtil.selectNot(projection,
readableMetricsIds);
+
+ Schema projectionForReadableMetrics =
+ new Schema(
+ MetricsUtil.READABLE_METRIC_COLS.stream()
+ .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
+ .collect(Collectors.toList()));
+
+ Schema projectionForMetrics = TypeUtil.join(fileProjection,
projectionForReadableMetrics);
+ return CloseableIterable.transform(files(projectionForMetrics),
this::withReadableMetrics);
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
switch (manifest.content()) {
case DATA:
- return ManifestFiles.read(manifest, io, specsById).project(schema);
+ return ManifestFiles.read(manifest, io,
specsById).project(fileProjection);
case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(schema);
+ return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
}
+ private StructLike withReadableMetrics(ContentFile<?> file) {
+ int expectedSize = projection.columns().size();
+ StructType projectedMetricType =
+
projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
+ MetricsUtil.ReadableMetricsStruct readableMetrics =
+ MetricsUtil.readableMetricsStruct(dataTableSchema, file,
projectedMetricType);
+ return new ContentFileStructWithMetrics(expectedSize, (StructLike) file,
readableMetrics);
+ }
+
@Override
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
@@ -186,4 +219,44 @@ abstract class BaseFilesTable extends BaseMetadataTable {
return manifest;
}
}
+
+ static class ContentFileStructWithMetrics implements StructLike {
+ private final StructLike fileAsStruct;
+ private final MetricsUtil.ReadableMetricsStruct readableMetrics;
+ private final int expectedSize;
+
+ ContentFileStructWithMetrics(
+ int expectedSize,
+ StructLike fileAsStruct,
+ MetricsUtil.ReadableMetricsStruct readableMetrics) {
+ this.fileAsStruct = fileAsStruct;
+ this.readableMetrics = readableMetrics;
+ this.expectedSize = expectedSize;
+ }
+
+ @Override
+ public int size() {
+ return expectedSize;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ int lastExpectedIndex = expectedSize - 1;
+ if (pos < lastExpectedIndex) {
+ return fileAsStruct.get(pos, javaClass);
+ } else if (pos == lastExpectedIndex) {
+ return javaClass.cast(readableMetrics);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Illegal position access for ContentFileStructWithMetrics: %d,
max allowed is %d",
+ pos, lastExpectedIndex));
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("ContentFileStructWithMetrics is
read only");
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index 98710ef79e..cde9bcb4a0 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -18,11 +18,23 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
public class MetricsUtil {
@@ -56,4 +68,292 @@ public class MetricsUtil {
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ public static final List<ReadableMetricColDefinition> READABLE_METRIC_COLS =
+ ImmutableList.of(
+ new ReadableMetricColDefinition(
+ "column_size",
+ "Total size on disk",
+ DataFile.COLUMN_SIZES,
+ field -> Types.LongType.get(),
+ (file, field) ->
+ file.columnSizes() == null ? null :
file.columnSizes().get(field.fieldId())),
+ new ReadableMetricColDefinition(
+ "value_count",
+ "Total count, including null and NaN",
+ DataFile.VALUE_COUNTS,
+ field -> Types.LongType.get(),
+ (file, field) ->
+ file.valueCounts() == null ? null :
file.valueCounts().get(field.fieldId())),
+ new ReadableMetricColDefinition(
+ "null_value_count",
+ "Null value count",
+ DataFile.NULL_VALUE_COUNTS,
+ field -> Types.LongType.get(),
+ (file, field) ->
+ file.nullValueCounts() == null
+ ? null
+ : file.nullValueCounts().get(field.fieldId())),
+ new ReadableMetricColDefinition(
+ "nan_value_count",
+ "NaN value count",
+ DataFile.NAN_VALUE_COUNTS,
+ field -> Types.LongType.get(),
+ (file, field) ->
+ file.nanValueCounts() == null
+ ? null
+ : file.nanValueCounts().get(field.fieldId())),
+ new ReadableMetricColDefinition(
+ "lower_bound",
+ "Lower bound",
+ DataFile.LOWER_BOUNDS,
+ Types.NestedField::type,
+ (file, field) ->
+ file.lowerBounds() == null
+ ? null
+ : Conversions.fromByteBuffer(
+ field.type(),
file.lowerBounds().get(field.fieldId()))),
+ new ReadableMetricColDefinition(
+ "upper_bound",
+ "Upper bound",
+ DataFile.UPPER_BOUNDS,
+ Types.NestedField::type,
+ (file, field) ->
+ file.upperBounds() == null
+ ? null
+ : Conversions.fromByteBuffer(
+ field.type(),
file.upperBounds().get(field.fieldId()))));
+
+ public static final String READABLE_METRICS = "readable_metrics";
+
+ /**
+ * Fixed definition of a readable metric column, ie a mapping of a raw
metric to a readable metric
+ */
+ public static class ReadableMetricColDefinition {
+ private final String name;
+ private final String doc;
+ private final Types.NestedField originalCol;
+ private final TypeFunction typeFunction;
+ private final MetricFunction metricFunction;
+
+ public interface TypeFunction {
+ Type type(Types.NestedField originalCol);
+ }
+
+ public interface MetricFunction {
+ Object metric(ContentFile<?> file, Types.NestedField originalCol);
+ }
+
+ /**
+ * @param name column name
+ * @param doc column doc
+ * @param originalCol original (raw) metric column field on metadata table
+ * @param typeFunction function that returns the readable metric column
type from original field
+ * type
+ * @param metricFunction function that returns readable metric from data
file
+ */
+ ReadableMetricColDefinition(
+ String name,
+ String doc,
+ Types.NestedField originalCol,
+ TypeFunction typeFunction,
+ MetricFunction metricFunction) {
+ this.name = name;
+ this.doc = doc;
+ this.originalCol = originalCol;
+ this.typeFunction = typeFunction;
+ this.metricFunction = metricFunction;
+ }
+
+ Types.NestedField originalCol() {
+ return originalCol;
+ }
+
+ Type colType(Types.NestedField field) {
+ return typeFunction.type(field);
+ }
+
+ String name() {
+ return name;
+ }
+
+ String doc() {
+ return doc;
+ }
+
+ Object value(ContentFile<?> dataFile, Types.NestedField dataField) {
+ return metricFunction.metric(dataFile, dataField);
+ }
+ }
+
+ /** A struct of readable metric values for a primitive column */
+ public static class ReadableColMetricsStruct implements StructLike {
+
+ private final String columnName;
+ private final Map<Integer, Integer> projectionMap;
+ private final Object[] metrics;
+
+ public ReadableColMetricsStruct(
+ String columnName, Types.NestedField projection, Object... metrics) {
+ this.columnName = columnName;
+ this.projectionMap = readableMetricsProjection(projection);
+ this.metrics = metrics;
+ }
+
+ @Override
+ public int size() {
+ return projectionMap.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Object value = get(pos);
+ return value == null ? null : javaClass.cast(value);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("ReadableColMetricsStruct is
read only");
+ }
+
+ private Object get(int pos) {
+ int projectedPos = projectionMap.get(pos);
+ return metrics[projectedPos];
+ }
+
+ /** Returns map of projected position to actual position of this struct's
fields */
+ private Map<Integer, Integer> readableMetricsProjection(Types.NestedField
projection) {
+ Map<Integer, Integer> result = Maps.newHashMap();
+
+ Set<String> projectedFields =
+ Sets.newHashSet(
+ projection.type().asStructType().fields().stream()
+ .map(Types.NestedField::name)
+ .collect(Collectors.toSet()));
+
+ int projectedIndex = 0;
+ for (int fieldIndex = 0; fieldIndex < READABLE_METRIC_COLS.size();
fieldIndex++) {
+ ReadableMetricColDefinition readableMetric =
READABLE_METRIC_COLS.get(fieldIndex);
+
+ if (projectedFields.contains(readableMetric.name())) {
+ result.put(projectedIndex, fieldIndex);
+ projectedIndex++;
+ }
+ }
+ return result;
+ }
+
+ String columnName() {
+ return columnName;
+ }
+ }
+
+ /**
+ * A struct, consisting of all {@link ReadableColMetricsStruct} for all
primitive columns of the
+ * table
+ */
+ public static class ReadableMetricsStruct implements StructLike {
+
+ private final List<StructLike> columnMetrics;
+
+ public ReadableMetricsStruct(List<StructLike> columnMetrics) {
+ this.columnMetrics = columnMetrics;
+ }
+
+ @Override
+ public int size() {
+ return columnMetrics.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(columnMetrics.get(pos));
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("ReadableMetricsStruct is read
only");
+ }
+ }
+
+ /**
+ * Calculates a dynamic schema for readable_metrics to add to metadata
tables. The type will be
+ * the struct {@link ReadableColMetricsStruct}, composed of {@link
ReadableMetricsStruct} for all
+ * primitive columns in the data table
+ *
+ * @param dataTableSchema schema of data table
+ * @param metadataTableSchema schema of existing metadata table (to ensure
id uniqueness)
+ * @return schema of readable_metrics struct
+ */
+ public static Schema readableMetricsSchema(Schema dataTableSchema, Schema
metadataTableSchema) {
+ List<Types.NestedField> fields = Lists.newArrayList();
+ Map<Integer, String> idToName = dataTableSchema.idToName();
+ AtomicInteger nextId = new
AtomicInteger(metadataTableSchema.highestFieldId());
+
+ for (int id : idToName.keySet()) {
+ Types.NestedField field = dataTableSchema.findField(id);
+
+ if (field.type().isPrimitiveType()) {
+ String colName = idToName.get(id);
+
+ fields.add(
+ Types.NestedField.of(
+ nextId.incrementAndGet(),
+ true,
+ colName,
+ Types.StructType.of(
+ READABLE_METRIC_COLS.stream()
+ .map(
+ m ->
+ optional(
+ nextId.incrementAndGet(), m.name(),
m.colType(field), m.doc()))
+ .collect(Collectors.toList())),
+ String.format("Metrics for column %s", colName)));
+ }
+ }
+
+ fields.sort(Comparator.comparing(Types.NestedField::name));
+ return new Schema(
+ optional(
+ nextId.incrementAndGet(),
+ "readable_metrics",
+ Types.StructType.of(fields),
+ "Column metrics in readable form"));
+ }
+
+ /**
+ * Return a readable metrics struct row from file metadata
+ *
+ * @param schema schema of original data table
+ * @param file content file with metrics
+ * @param projectedSchema user requested projection
+ * @return {@link ReadableMetricsStruct}
+ */
+ public static ReadableMetricsStruct readableMetricsStruct(
+ Schema schema, ContentFile<?> file, Types.StructType projectedSchema) {
+ Map<Integer, String> idToName = schema.idToName();
+ List<ReadableColMetricsStruct> colMetrics = Lists.newArrayList();
+
+ for (int id : idToName.keySet()) {
+ String qualifiedName = idToName.get(id);
+ Types.NestedField field = schema.findField(id);
+
+ Object[] metrics =
+ READABLE_METRIC_COLS.stream()
+ .map(readableMetric -> readableMetric.value(file, field))
+ .toArray();
+
+ if (field.type().isPrimitiveType() // Iceberg stores metrics only for
primitive types
+ && projectedSchema.field(qualifiedName)
+ != null) { // User has requested this column metric
+ colMetrics.add(
+ new ReadableColMetricsStruct(
+ qualifiedName, projectedSchema.field(qualifiedName), metrics));
+ }
+ }
+
+
colMetrics.sort(Comparator.comparing(ReadableColMetricsStruct::columnName));
+ return new ReadableMetricsStruct(
+ colMetrics.stream().map(m -> (StructLike)
m).collect(Collectors.toList()));
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index e816b10db8..a940932209 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -527,6 +527,93 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Assert.assertEquals(expected, scan.schema().asStruct());
}
+ @Test
+ public void testFilesTableReadableMetricsSchema() {
+ Table filesTable = new FilesTable(table.ops(), table);
+ Types.StructType actual =
filesTable.newScan().schema().select("readable_metrics").asStruct();
+ int highestId = filesTable.schema().highestFieldId();
+
+ Types.StructType expected =
+ Types.StructType.of(
+ optional(
+ highestId,
+ "readable_metrics",
+ Types.StructType.of(
+ Types.NestedField.optional(
+ highestId - 14,
+ "data",
+ Types.StructType.of(
+ Types.NestedField.optional(
+ highestId - 13,
+ "column_size",
+ Types.LongType.get(),
+ "Total size on disk"),
+ Types.NestedField.optional(
+ highestId - 12,
+ "value_count",
+ Types.LongType.get(),
+ "Total count, including null and NaN"),
+ Types.NestedField.optional(
+ highestId - 11,
+ "null_value_count",
+ Types.LongType.get(),
+ "Null value count"),
+ Types.NestedField.optional(
+ highestId - 10,
+ "nan_value_count",
+ Types.LongType.get(),
+ "NaN value count"),
+ Types.NestedField.optional(
+ highestId - 9,
+ "lower_bound",
+ Types.StringType.get(),
+ "Lower bound"),
+ Types.NestedField.optional(
+ highestId - 8,
+ "upper_bound",
+ Types.StringType.get(),
+ "Upper bound")),
+ "Metrics for column data"),
+ Types.NestedField.optional(
+ highestId - 7,
+ "id",
+ Types.StructType.of(
+ Types.NestedField.optional(
+ highestId - 6,
+ "column_size",
+ Types.LongType.get(),
+ "Total size on disk"),
+ Types.NestedField.optional(
+ highestId - 5,
+ "value_count",
+ Types.LongType.get(),
+ "Total count, including null and NaN"),
+ Types.NestedField.optional(
+ highestId - 4,
+ "null_value_count",
+ Types.LongType.get(),
+ "Null value count"),
+ Types.NestedField.optional(
+ highestId - 3,
+ "nan_value_count",
+ Types.LongType.get(),
+ "NaN value count"),
+ Types.NestedField.optional(
+ highestId - 2,
+ "lower_bound",
+ Types.IntegerType.get(),
+ "Lower bound"),
+ Types.NestedField.optional(
+ highestId - 1,
+ "upper_bound",
+ Types.IntegerType.get(),
+ "Upper bound")),
+ "Metrics for column id")),
+ "Column metrics in readable form"));
+
+ Assert.assertEquals("Dynamic schema for readable_metrics should match",
actual, expected);
+ }
+
@Test
public void testPartitionSpecEvolutionAdditive() {
preparePartitionedTable();
diff --git
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 675a330c84..07d77dc99b 100644
---
a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -55,6 +55,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -68,12 +71,14 @@ import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Rule;
@@ -420,7 +425,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -443,8 +447,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -463,7 +468,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(0), actual.get(0));
}
@Test
@@ -474,7 +479,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
"true").commit();
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
spark.sql(
String.format(
@@ -488,6 +492,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data",
"id").write().mode("overwrite").insertInto("parquet_table");
+ NameMapping mapping = MappingUtil.create(table.schema());
+ String mappingJson = NameMappingParser.toJson(mapping);
+
+ table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING,
mappingJson).commit();
+
try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(
@@ -496,12 +505,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table,
stagingLocation);
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "files"))
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -516,10 +522,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
}
+ Types.StructType struct = nonDerivedSchema(filesTableDs);
Assert.assertEquals("Files table should have one row", 2,
expected.size());
Assert.assertEquals("Actual results should have one row", 2,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(1), actual.get(1));
+ TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
} finally {
spark.sql("DROP TABLE parquet_table");
}
@@ -582,7 +589,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_files_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -609,8 +615,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFile(toDelete).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -629,7 +636,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(0), actual.get(0));
}
@Test
@@ -692,7 +699,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "all_data_files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -718,20 +724,16 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// ensure table data isn't stale
table.refresh();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "all_data_files"))
- .orderBy("file_path")
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"all_data_files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest :
+ Iterable<ManifestFile> dataManifests =
Iterables.concat(
- Iterables.transform(
- table.snapshots(), snapshot ->
snapshot.dataManifests(table.io())))) {
+ Iterables.transform(table.snapshots(), snapshot ->
snapshot.dataManifests(table.io())));
+ for (ManifestFile manifest : dataManifests) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
@@ -750,7 +752,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have two rows", 2,
expected.size());
Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(i), actual.get(i));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(i), actual.get(i));
}
}
@@ -1751,7 +1753,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.build();
}
- private void asMetadataRecord(GenericData.Record file) {
+ public static void asMetadataRecord(GenericData.Record file) {
file.put(0, FileContent.DATA.id());
file.put(3, 0); // specId
}
@@ -1783,4 +1785,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
return positionDeleteWriter.toDeleteFile();
}
+
+ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+ StructField[] fields = metadataTable.schema().fields();
+ return metadataTable.select(
+ Stream.of(fields)
+ .filter(f -> !f.name().equals("readable_metrics")) // derived field
+ .map(f -> new Column(f.name()))
+ .toArray(Column[]::new));
+ }
+
+ public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+ return
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
+ }
}
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 1976d71b6c..c51470f19c 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -55,6 +55,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -68,12 +71,14 @@ import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Rule;
@@ -418,7 +423,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -441,8 +445,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -461,7 +466,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(0), actual.get(0));
}
@Test
@@ -472,7 +477,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
"true").commit();
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
spark.sql(
String.format(
@@ -486,6 +490,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data",
"id").write().mode("overwrite").insertInto("parquet_table");
+ NameMapping mapping = MappingUtil.create(table.schema());
+ String mappingJson = NameMappingParser.toJson(mapping);
+
+ table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING,
mappingJson).commit();
+
try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(
@@ -494,12 +503,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table,
stagingLocation);
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "files"))
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -514,10 +520,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
}
+ Types.StructType struct = nonDerivedSchema(filesTableDs);
Assert.assertEquals("Files table should have one row", 2,
expected.size());
Assert.assertEquals("Actual results should have one row", 2,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(1), actual.get(1));
+ TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
} finally {
spark.sql("DROP TABLE parquet_table");
}
@@ -580,7 +587,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_files_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -607,8 +613,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFile(toDelete).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -627,7 +634,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(0), actual.get(0));
}
@Test
@@ -690,7 +697,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "all_data_files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -716,13 +722,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// ensure table data isn't stale
table.refresh();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "all_data_files"))
- .orderBy("file_path")
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"all_data_files"));
+ List<Row> actual = selectNonDerived(filesTableDs).collectAsList();
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
@@ -748,7 +750,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have two rows", 2,
expected.size());
Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(i), actual.get(i));
+ TestHelpers.assertEqualsSafe(nonDerivedSchema(filesTableDs),
expected.get(i), actual.get(i));
}
}
@@ -1789,11 +1791,6 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.build();
}
- private void asMetadataRecord(GenericData.Record file) {
- file.put(0, FileContent.DATA.id());
- file.put(3, 0); // specId
- }
-
private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
Table table, PartitionSpec spec, StructLike partition) {
OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0,
0).build();
@@ -1821,4 +1818,22 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
return positionDeleteWriter.toDeleteFile();
}
+
+ private static void asMetadataRecord(GenericData.Record file) {
+ file.put(0, FileContent.DATA.id());
+ file.put(3, 0); // specId
+ }
+
+ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+ StructField[] fields = metadataTable.schema().fields();
+ return metadataTable.select(
+ Stream.of(fields)
+ .filter(f -> !f.name().equals("readable_metrics")) // derived field
+ .map(f -> new Column(f.name()))
+ .toArray(Column[]::new));
+ }
+
+ public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+ return
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
+ }
}
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 64a10bfdcd..a60e0f5d93 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -90,8 +92,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".files").schema();
// check delete files table
- List<Row> actualDeleteFiles =
- spark.sql("SELECT * FROM " + tableName +
".delete_files").collectAsList();
+ Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName
+ ".delete_files");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
@@ -100,22 +102,28 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
Assert.assertEquals("Should be one delete file manifest entry", 1,
expectedDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// check data files table
- List<Row> actualDataFiles =
- spark.sql("SELECT * FROM " + tableName +
".data_files").collectAsList();
+ Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName +
".data_files");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// check all files table
- List<Row> actualFiles =
- spark.sql("SELECT * FROM " + tableName + ".files ORDER BY
content").collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
List<Record> expectedFiles =
@@ -123,9 +131,9 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
.collect(Collectors.toList());
Assert.assertEquals("Should have two files manifest entries", 2,
expectedFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
@Test
@@ -175,27 +183,31 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals(
"Should have one delete file manifest entry", 1,
expectedDeleteFiles.size());
- List<Row> actualDeleteFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE
partition.data='a'");
+ List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
+
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check data files table
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
Assert.assertEquals("Should have one data file manifest entry", 1,
expectedDataFiles.size());
- List<Row> actualDataFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".data_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDataFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE
partition.data='a'");
+
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
List<Row> actualPartitionsWithProjection =
spark.sql("SELECT file_count FROM " + tableName + ".partitions
").collectAsList();
@@ -213,19 +225,15 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
.collect(Collectors.toList());
Assert.assertEquals("Should have two file manifest entries", 2,
expectedFiles.size());
- List<Row> actualFiles =
- spark
- .sql(
- "SELECT * FROM "
- + tableName
- + ".files "
- + "WHERE partition.data='a' ORDER BY content")
- .collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql(
+ "SELECT * FROM " + tableName + ".files " + "WHERE
partition.data='a' ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
@Test
@@ -265,19 +273,22 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Spark3Util.loadIcebergTable(spark, tableName +
".all_data_files").schema();
// Check all data files table
- List<Row> actualDataFiles =
- spark.sql("SELECT * FROM " + tableName +
".all_data_files").collectAsList();
+ Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName +
".all_data_files");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// Check all delete files table
- List<Row> actualDeleteFiles =
- spark.sql("SELECT * FROM " + tableName +
".all_delete_files").collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_delete_files");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
@@ -285,15 +296,19 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check all files table
- List<Row> actualFiles =
- spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY
content").collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY
content");
+ List<Row> actualFiles = actualFilesDs.collectAsList();
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
- TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles,
actualFiles);
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles,
actualFiles);
}
@Test
@@ -341,22 +356,24 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Spark3Util.loadIcebergTable(spark, tableName +
".all_data_files").schema();
// Check all data files table
- List<Row> actualDataFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDataFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE
partition.data='a'");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
+ .asStruct(),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// Check all delete files table
- List<Row> actualDeleteFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE
partition.data='a'");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
+
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
@@ -364,21 +381,24 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals("Metadata table should return one data file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check all files table
- List<Row> actualFiles =
- spark
- .sql(
- "SELECT * FROM "
- + tableName
- + ".all_files WHERE partition.data='a' "
- + "ORDER BY content")
- .collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql(
+ "SELECT * FROM "
+ + tableName
+ + ".all_files WHERE partition.data='a' "
+ + "ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
- TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles,
actualFiles);
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles,
actualFiles);
}
@Test
@@ -615,7 +635,7 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
if ((Integer) record.get("status") < 2 /* added or existing */) {
Record file = (Record) record.get("data_file");
if (partitionMatch(file, partValue)) {
- asMetadataRecord(file, expectedContent);
+ TestHelpers.asMetadataRecord(file, expectedContent);
expected.add(file);
}
}
@@ -625,12 +645,6 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
return expected;
}
- // Populate certain fields derived in the metadata tables
- private void asMetadataRecord(Record file, FileContent content) {
- file.put(0, content.id());
- file.put(3, 0); // specId
- }
-
private boolean partitionMatch(Record file, String partValue) {
if (partValue == null) {
return true;
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index e5ad0ca213..59074bbd92 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -38,12 +38,14 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
@@ -51,10 +53,13 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -817,4 +822,27 @@ public class TestHelpers {
.map(ManifestFile::path)
.collect(Collectors.toSet());
}
+
+ public static void asMetadataRecord(GenericData.Record file, FileContent
content) {
+ file.put(0, content.id());
+ file.put(3, 0); // specId
+ }
+
+ public static void asMetadataRecord(GenericData.Record file) {
+ file.put(0, FileContent.DATA.id());
+ file.put(3, 0); // specId
+ }
+
+ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+ StructField[] fields = metadataTable.schema().fields();
+ return metadataTable.select(
+ Stream.of(fields)
+ .filter(f -> !f.name().equals("readable_metrics")) // derived field
+ .map(f -> new Column(f.name()))
+ .toArray(Column[]::new));
+ }
+
+ public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+ return
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+ }
}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 57ce654a71..361011dd1c 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -55,6 +54,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -173,7 +175,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
row.put(2, 0L); // data sequence number
row.put(3, 0L); // file sequence number
GenericData.Record file = (GenericData.Record)
row.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(row);
});
}
@@ -367,7 +369,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
row.put(2, 0L); // data sequence number
row.put(3, 0L); // file sequence number
GenericData.Record file = (GenericData.Record)
row.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(row);
});
}
@@ -418,7 +420,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -441,8 +442,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -452,7 +454,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -461,7 +463,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0),
actual.get(0));
}
@Test
@@ -472,7 +475,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
"true").commit();
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
spark.sql(
String.format(
@@ -486,6 +488,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data",
"id").write().mode("overwrite").insertInto("parquet_table");
+ NameMapping mapping = MappingUtil.create(table.schema());
+ String mappingJson = NameMappingParser.toJson(mapping);
+
+ table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING,
mappingJson).commit();
+
try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(
@@ -494,12 +501,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table,
stagingLocation);
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "files"))
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -508,16 +512,17 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
}
+ Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
Assert.assertEquals("Files table should have one row", 2,
expected.size());
Assert.assertEquals("Actual results should have one row", 2,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(1), actual.get(1));
+ TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
} finally {
spark.sql("DROP TABLE parquet_table");
}
@@ -580,7 +585,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_files_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -607,8 +611,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFile(toDelete).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -618,7 +623,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -627,7 +632,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0),
actual.get(0));
}
@Test
@@ -690,7 +696,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "all_data_files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -716,13 +721,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// ensure table data isn't stale
table.refresh();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "all_data_files"))
- .orderBy("file_path")
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"all_data_files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
@@ -736,7 +737,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -748,7 +749,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have two rows", 2,
expected.size());
Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(i), actual.get(i));
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(i),
actual.get(i));
}
}
@@ -1806,11 +1808,6 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.build();
}
- private void asMetadataRecord(GenericData.Record file) {
- file.put(0, FileContent.DATA.id());
- file.put(3, 0); // specId
- }
-
private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
Table table, PartitionSpec spec, StructLike partition) {
OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0,
0).build();
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 64a10bfdcd..a60e0f5d93 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -39,8 +39,10 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -90,8 +92,8 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".files").schema();
// check delete files table
- List<Row> actualDeleteFiles =
- spark.sql("SELECT * FROM " + tableName +
".delete_files").collectAsList();
+ Dataset<Row> actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName
+ ".delete_files");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
@@ -100,22 +102,28 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
Assert.assertEquals("Should be one delete file manifest entry", 1,
expectedDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// check data files table
- List<Row> actualDataFiles =
- spark.sql("SELECT * FROM " + tableName +
".data_files").collectAsList();
+ Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName +
".data_files");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// check all files table
- List<Row> actualFiles =
- spark.sql("SELECT * FROM " + tableName + ".files ORDER BY
content").collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".files ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
List<Record> expectedFiles =
@@ -123,9 +131,9 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
.collect(Collectors.toList());
Assert.assertEquals("Should have two files manifest entries", 2,
expectedFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
@Test
@@ -175,27 +183,31 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals(
"Should have one delete file manifest entry", 1,
expectedDeleteFiles.size());
- List<Row> actualDeleteFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE
partition.data='a'");
+ List<Row> actualDeleteFiles = actualDeleteFilesDs.collectAsList();
+
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check data files table
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
Assert.assertEquals("Should have one data file manifest entry", 1,
expectedDataFiles.size());
- List<Row> actualDataFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".data_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDataFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".data_files " + "WHERE
partition.data='a'");
+
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
List<Row> actualPartitionsWithProjection =
spark.sql("SELECT file_count FROM " + tableName + ".partitions
").collectAsList();
@@ -213,19 +225,15 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
.collect(Collectors.toList());
Assert.assertEquals("Should have two file manifest entries", 2,
expectedFiles.size());
- List<Row> actualFiles =
- spark
- .sql(
- "SELECT * FROM "
- + tableName
- + ".files "
- + "WHERE partition.data='a' ORDER BY content")
- .collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql(
+ "SELECT * FROM " + tableName + ".files " + "WHERE
partition.data='a' ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(0), actualFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedFiles.get(1), actualFiles.get(1));
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
}
@Test
@@ -265,19 +273,22 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Spark3Util.loadIcebergTable(spark, tableName +
".all_data_files").schema();
// Check all data files table
- List<Row> actualDataFiles =
- spark.sql("SELECT * FROM " + tableName +
".all_data_files").collectAsList();
+ Dataset<Row> actualDataFilesDs = spark.sql("SELECT * FROM " + tableName +
".all_data_files");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDataFilesDs),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// Check all delete files table
- List<Row> actualDeleteFiles =
- spark.sql("SELECT * FROM " + tableName +
".all_delete_files").collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_delete_files");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
@@ -285,15 +296,19 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals(
"Metadata table should return one delete file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check all files table
- List<Row> actualFiles =
- spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY
content").collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY
content");
+ List<Row> actualFiles = actualFilesDs.collectAsList();
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
- TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles,
actualFiles);
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles,
actualFiles);
}
@Test
@@ -341,22 +356,24 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Spark3Util.loadIcebergTable(spark, tableName +
".all_data_files").schema();
// Check all data files table
- List<Row> actualDataFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDataFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_data_files " + "WHERE
partition.data='a'");
+ List<Row> actualDataFiles =
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList();
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDataFiles.get(0),
actualDataFiles.get(0));
+
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
+ .asStruct(),
+ expectedDataFiles.get(0),
+ actualDataFiles.get(0));
// Check all delete files table
- List<Row> actualDeleteFiles =
- spark
- .sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE
partition.data='a'")
- .collectAsList();
+ Dataset<Row> actualDeleteFilesDs =
+ spark.sql("SELECT * FROM " + tableName + ".all_delete_files " + "WHERE
partition.data='a'");
+ List<Row> actualDeleteFiles =
TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList();
+
List<Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.POSITION_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
@@ -364,21 +381,24 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Assert.assertEquals("Metadata table should return one data file", 1,
actualDeleteFiles.size());
TestHelpers.assertEqualsSafe(
- filesTableSchema.asStruct(), expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
+ TestHelpers.nonDerivedSchema(actualDeleteFilesDs),
+ expectedDeleteFiles.get(0),
+ actualDeleteFiles.get(0));
// Check all files table
- List<Row> actualFiles =
- spark
- .sql(
- "SELECT * FROM "
- + tableName
- + ".all_files WHERE partition.data='a' "
- + "ORDER BY content")
- .collectAsList();
+ Dataset<Row> actualFilesDs =
+ spark.sql(
+ "SELECT * FROM "
+ + tableName
+ + ".all_files WHERE partition.data='a' "
+ + "ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+
List<Record> expectedFiles = ListUtils.union(expectedDataFiles,
expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
- TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles,
actualFiles);
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualDataFilesDs), expectedFiles,
actualFiles);
}
@Test
@@ -615,7 +635,7 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
if ((Integer) record.get("status") < 2 /* added or existing */) {
Record file = (Record) record.get("data_file");
if (partitionMatch(file, partValue)) {
- asMetadataRecord(file, expectedContent);
+ TestHelpers.asMetadataRecord(file, expectedContent);
expected.add(file);
}
}
@@ -625,12 +645,6 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
return expected;
}
- // Populate certain fields derived in the metadata tables
- private void asMetadataRecord(Record file, FileContent content) {
- file.put(0, content.id());
- file.put(3, 0); // specId
- }
-
private boolean partitionMatch(Record file, String partValue) {
if (partValue == null) {
return true;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index e5ad0ca213..59074bbd92 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -38,12 +38,14 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
@@ -51,10 +53,13 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -817,4 +822,27 @@ public class TestHelpers {
.map(ManifestFile::path)
.collect(Collectors.toSet());
}
+
+ public static void asMetadataRecord(GenericData.Record file, FileContent
content) {
+ file.put(0, content.id());
+ file.put(3, 0); // specId
+ }
+
+ public static void asMetadataRecord(GenericData.Record file) {
+ file.put(0, FileContent.DATA.id());
+ file.put(3, 0); // specId
+ }
+
+ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+ StructField[] fields = metadataTable.schema().fields();
+ return metadataTable.select(
+ Stream.of(fields)
+ .filter(f -> !f.name().equals("readable_metrics")) // derived field
+ .map(f -> new Column(f.name()))
+ .toArray(Column[]::new));
+ }
+
+ public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+ return
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 57ce654a71..86bb980953 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -55,6 +54,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -173,7 +175,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
row.put(2, 0L); // data sequence number
row.put(3, 0L); // file sequence number
GenericData.Record file = (GenericData.Record)
row.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(row);
});
}
@@ -367,7 +369,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
row.put(2, 0L); // data sequence number
row.put(3, 0L); // file sequence number
GenericData.Record file = (GenericData.Record)
row.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(row);
});
}
@@ -418,7 +420,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -441,8 +442,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -452,7 +454,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -461,7 +463,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0),
actual.get(0));
}
@Test
@@ -472,7 +476,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
"true").commit();
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
spark.sql(
String.format(
@@ -486,6 +489,11 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data",
"id").write().mode("overwrite").insertInto("parquet_table");
+ NameMapping mapping = MappingUtil.create(table.schema());
+ String mappingJson = NameMappingParser.toJson(mapping);
+
+ table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING,
mappingJson).commit();
+
try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(
@@ -494,12 +502,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table,
stagingLocation);
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "files"))
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -508,16 +513,17 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
}
+ Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
Assert.assertEquals("Files table should have one row", 2,
expected.size());
Assert.assertEquals("Actual results should have one row", 2,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(1), actual.get(1));
+ TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
} finally {
spark.sql("DROP TABLE parquet_table");
}
@@ -580,7 +586,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_files_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -607,8 +612,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// delete the first file to test that only live files are listed
table.newDelete().deleteFile(toDelete).commit();
- List<Row> actual =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files")).collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
@@ -618,7 +624,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -627,7 +633,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have one row", 1, expected.size());
Assert.assertEquals("Actual results should have one row", 1,
actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0),
actual.get(0));
}
@Test
@@ -690,7 +697,6 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table entriesTable = loadTable(tableIdentifier, "entries");
- Table filesTable = loadTable(tableIdentifier, "all_data_files");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -716,13 +722,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// ensure table data isn't stale
table.refresh();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "all_data_files"))
- .orderBy("file_path")
- .collectAsList();
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"all_data_files"));
+ List<Row> actual =
TestHelpers.selectNonDerived(filesTableDs).collectAsList();
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
@@ -736,7 +738,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record)
record.get("data_file");
- asMetadataRecord(file);
+ TestHelpers.asMetadataRecord(file);
expected.add(file);
}
}
@@ -748,7 +750,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Files table should have two rows", 2,
expected.size());
Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(),
expected.get(i), actual.get(i));
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(filesTableDs), expected.get(i),
actual.get(i));
}
}
@@ -1806,11 +1809,6 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.build();
}
- private void asMetadataRecord(GenericData.Record file) {
- file.put(0, FileContent.DATA.id());
- file.put(3, 0); // specId
- }
-
private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
Table table, PartitionSpec spec, StructLike partition) {
OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0,
0).build();
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
new file mode 100644
index 0000000000..343943b0f8
--- /dev/null
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestMetadataTableReadableMetrics extends SparkTestBaseWithCatalog
{
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ private static final Types.StructType LEAF_STRUCT_TYPE =
+ Types.StructType.of(
+ optional(1, "leafLongCol", Types.LongType.get()),
+ optional(2, "leafDoubleCol", Types.DoubleType.get()));
+
+ private static final Types.StructType NESTED_STRUCT_TYPE =
+ Types.StructType.of(required(3, "leafStructCol", LEAF_STRUCT_TYPE));
+
+ private static final Schema NESTED_SCHEMA =
+ new Schema(required(4, "nestedStructCol", NESTED_STRUCT_TYPE));
+
+ private static final Schema PRIMITIVE_SCHEMA =
+ new Schema(
+ required(1, "booleanCol", Types.BooleanType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "longCol", Types.LongType.get()),
+ required(4, "floatCol", Types.FloatType.get()),
+ required(5, "doubleCol", Types.DoubleType.get()),
+ optional(6, "decimalCol", Types.DecimalType.of(10, 2)),
+ optional(7, "stringCol", Types.StringType.get()),
+ optional(8, "fixedCol", Types.FixedType.ofLength(3)),
+ optional(9, "binaryCol", Types.BinaryType.get()));
+
+ public TestMetadataTableReadableMetrics() {
+ // only SparkCatalog supports metadata table sql queries
+ super(SparkCatalogConfig.HIVE);
+ }
+
+ protected String tableName() {
+ return tableName.split("\\.")[2];
+ }
+
+ protected String database() {
+ return tableName.split("\\.")[1];
+ }
+
+ private Table createPrimitiveTable() throws IOException {
+ Table table =
+ catalog.createTable(
+ TableIdentifier.of(Namespace.of(database()), tableName()),
+ PRIMITIVE_SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of());
+ List<Record> records =
+ Lists.newArrayList(
+ createPrimitiveRecord(
+ false,
+ 1,
+ 1L,
+ 0,
+ 1.0D,
+ new BigDecimal("1.00"),
+ "1",
+ Base64.getDecoder().decode("1111"),
+ ByteBuffer.wrap(Base64.getDecoder().decode("1111"))),
+ createPrimitiveRecord(
+ true,
+ 2,
+ 2L,
+ 0,
+ 2.0D,
+ new BigDecimal("2.00"),
+ "2",
+ Base64.getDecoder().decode("2222"),
+ ByteBuffer.wrap(Base64.getDecoder().decode("2222"))),
+ createPrimitiveRecord(false, 1, 1, Float.NaN, Double.NaN, null,
"1", null, null),
+ createPrimitiveRecord(
+ false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2",
null, null));
+
+ DataFile dataFile =
+ FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+ table.newAppend().appendFile(dataFile).commit();
+ return table;
+ }
+
+ private void createNestedTable() throws IOException {
+ Table table =
+ catalog.createTable(
+ TableIdentifier.of(Namespace.of(database()), tableName()),
+ NESTED_SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of());
+
+ List<Record> records =
+ Lists.newArrayList(
+ createNestedRecord(0L, 0.0),
+ createNestedRecord(1L, Double.NaN),
+ createNestedRecord(null, null));
+ DataFile dataFile =
+ FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+ table.newAppend().appendFile(dataFile).commit();
+ }
+
+ @After
+ public void dropTable() {
+ sql("DROP TABLE %s", tableName);
+ }
+
+ private Dataset<Row> filesDf() {
+ return spark.read().format("iceberg").load(database() + "." + tableName()
+ ".files");
+ }
+
+ protected GenericRecord createPrimitiveRecord(
+ boolean booleanCol,
+ int intCol,
+ long longCol,
+ float floatCol,
+ double doubleCol,
+ BigDecimal decimalCol,
+ String stringCol,
+ byte[] fixedCol,
+ ByteBuffer binaryCol) {
+ GenericRecord record = GenericRecord.create(PRIMITIVE_SCHEMA);
+ record.set(0, booleanCol);
+ record.set(1, intCol);
+ record.set(2, longCol);
+ record.set(3, floatCol);
+ record.set(4, doubleCol);
+ record.set(5, decimalCol);
+ record.set(6, stringCol);
+ record.set(7, fixedCol);
+ record.set(8, binaryCol);
+ return record;
+ }
+
+ private GenericRecord createNestedRecord(Long longCol, Double doubleCol) {
+ GenericRecord record = GenericRecord.create(NESTED_SCHEMA);
+ GenericRecord nested = GenericRecord.create(NESTED_STRUCT_TYPE);
+ GenericRecord leaf = GenericRecord.create(LEAF_STRUCT_TYPE);
+ leaf.set(0, longCol);
+ leaf.set(1, doubleCol);
+ nested.set(0, leaf);
+ record.set(0, nested);
+ return record;
+ }
+
+ @Test
+ public void testPrimitiveColumns() throws Exception {
+ createPrimitiveTable();
+
+ Object[] binaryCol =
+ row(
+ 59L,
+ 4L,
+ 2L,
+ null,
+ Base64.getDecoder().decode("1111"),
+ Base64.getDecoder().decode("2222"));
+ Object[] booleanCol = row(44L, 4L, 0L, null, false, true);
+ Object[] decimalCol = row(97L, 4L, 1L, null, new BigDecimal("1.00"), new
BigDecimal("2.00"));
+ Object[] doubleCol = row(99L, 4L, 0L, 1L, 1.0D, 2.0D);
+ Object[] fixedCol =
+ row(
+ 55L,
+ 4L,
+ 2L,
+ null,
+ Base64.getDecoder().decode("1111"),
+ Base64.getDecoder().decode("2222"));
+ Object[] floatCol = row(90L, 4L, 0L, 2L, 0f, 0f);
+ Object[] intCol = row(91L, 4L, 0L, null, 1, 2);
+ Object[] longCol = row(91L, 4L, 0L, null, 1L, 2L);
+ Object[] stringCol = row(99L, 4L, 0L, null, "1", "2");
+
+ Object[] metrics =
+ row(
+ binaryCol,
+ booleanCol,
+ decimalCol,
+ doubleCol,
+ fixedCol,
+ floatCol,
+ intCol,
+ longCol,
+ stringCol);
+
+ assertEquals(
+ "Row should match",
+ ImmutableList.of(new Object[] {metrics}),
+ sql("SELECT readable_metrics FROM %s.files", tableName));
+ }
+
+ @Test
+ public void testSelectPrimitiveValues() throws Exception {
+ createPrimitiveTable();
+
+ assertEquals(
+ "select of primitive readable_metrics fields should work",
+ ImmutableList.of(row(1, true)),
+ sql(
+ "SELECT readable_metrics.intCol.lower_bound,
readable_metrics.booleanCol.upper_bound FROM %s.files",
+ tableName));
+
+ assertEquals(
+ "mixed select of readable_metrics and other field should work",
+ ImmutableList.of(row(0, 4L)),
+ sql("SELECT content, readable_metrics.longCol.value_count FROM
%s.files", tableName));
+
+ assertEquals(
+ "mixed select of readable_metrics and other field should work, in the
other order",
+ ImmutableList.of(row(4L, 0)),
+ sql("SELECT readable_metrics.longCol.value_count, content FROM
%s.files", tableName));
+ }
+
+ @Test
+ public void testSelectNestedValues() throws Exception {
+ createNestedTable();
+
+ assertEquals(
+ "select of nested readable_metrics fields should work",
+ ImmutableList.of(row(0L, 3L)),
+ sql(
+ "SELECT
readable_metrics.`nestedStructCol.leafStructCol.leafLongCol`.lower_bound, "
+ +
"readable_metrics.`nestedStructCol.leafStructCol.leafDoubleCol`.value_count
FROM %s.files",
+ tableName));
+ }
+
+ @Test
+ public void testNestedValues() throws Exception {
+ createNestedTable();
+
+ Object[] leafDoubleCol = row(53L, 3L, 1L, 1L, 0.0D, 0.0D);
+ Object[] leafLongCol = row(54L, 3L, 1L, null, 0L, 1L);
+ Object[] metrics = row(leafDoubleCol, leafLongCol);
+
+ assertEquals(
+ "Row should match",
+ ImmutableList.of(new Object[] {metrics}),
+ sql("SELECT readable_metrics FROM %s.files", tableName));
+ }
+}