This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b0e4b3a5f Core: Fix metrics column limit with nested columns (#13039)
1b0e4b3a5f is described below

commit 1b0e4b3a5f6df535828e6c40d381777b1f456df3
Author: Joshua Kolash <[email protected]>
AuthorDate: Thu Aug 7 15:27:02 2025 -0400

    Core: Fix metrics column limit with nested columns (#13039)
    
    Struct columns previously did not follow the 100 column limit and could 
substantially increase the size of column stats in manifest files. This updates 
the limit to apply to 100 columns regardless of nesting, with precedence going 
to columns that are closest to the top level.
---
 .../java/org/apache/iceberg/MetricsConfig.java     | 118 +++++++++++++++++++--
 .../java/org/apache/iceberg/TestMetricsConfig.java |  73 +++++++++++++
 .../java/org/apache/iceberg/TestMetricsModes.java  |  73 +++++++++++++
 .../org/apache/iceberg/io/TestWriterMetrics.java   |  37 +++++++
 4 files changed, 290 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java 
b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
index 9315fde5d9..75fb0eeec5 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
@@ -25,15 +25,20 @@ import static 
org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEF
 import static 
org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX;
 
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.concurrent.Immutable;
 import org.apache.iceberg.MetricsModes.MetricsMode;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SortOrderUtil;
@@ -107,6 +112,95 @@ public final class MetricsConfig implements Serializable {
     return new MetricsConfig(columnModes.build(), defaultMode);
   }
 
+  static Set<Integer> limitFieldIds(Schema schema, int limit) {
+    return TypeUtil.visit(
+        schema,
+        new TypeUtil.CustomOrderSchemaVisitor<>() {
+          private final Set<Integer> idSet = Sets.newHashSet();
+
+          private boolean shouldContinue() {
+            return idSet.size() < limit;
+          }
+
+          private boolean metricsEligible(Type type) {
+            return type.isPrimitiveType() || type.isVariantType();
+          }
+
+          @Override
+          @SuppressWarnings("ReturnValueIgnored")
+          public Set<Integer> schema(Schema schema, Supplier<Set<Integer>> 
structResult) {
+            // We need to call structResult.get() to visit the schema
+            structResult.get();
+            return idSet;
+          }
+
+          @Override
+          public Set<Integer> struct(Types.StructType struct, 
Iterable<Set<Integer>> fieldResults) {
+            Iterator<Types.NestedField> fields = struct.fields().iterator();
+            while (shouldContinue() && fields.hasNext()) {
+              Types.NestedField field = fields.next();
+              if (metricsEligible(field.type())) {
+                idSet.add(field.fieldId());
+              }
+            }
+
+            // visit children to add more ids
+            Iterator<Set<Integer>> iter = fieldResults.iterator();
+            while (shouldContinue() && iter.hasNext()) {
+              iter.next();
+            }
+
+            return null;
+          }
+
+          @Override
+          @SuppressWarnings("ReturnValueIgnored")
+          public Set<Integer> field(Types.NestedField field, 
Supplier<Set<Integer>> fieldResult) {
+            fieldResult.get();
+            return null;
+          }
+
+          @Override
+          @SuppressWarnings("ReturnValueIgnored")
+          public Set<Integer> list(Types.ListType list, Supplier<Set<Integer>> 
elementResult) {
+            if (shouldContinue() && metricsEligible(list.elementType())) {
+              idSet.add(list.elementId());
+            }
+
+            if (shouldContinue()) {
+              elementResult.get();
+            }
+
+            return null;
+          }
+
+          @Override
+          @SuppressWarnings("ReturnValueIgnored")
+          public Set<Integer> map(
+              Types.MapType map,
+              Supplier<Set<Integer>> keyResult,
+              Supplier<Set<Integer>> valueResult) {
+
+            if (shouldContinue() && metricsEligible(map.keyType())) {
+              idSet.add(map.keyId());
+            }
+
+            if (shouldContinue() && metricsEligible(map.valueType())) {
+              idSet.add(map.valueId());
+            }
+
+            if (shouldContinue()) {
+              keyResult.get();
+            }
+
+            if (shouldContinue()) {
+              valueResult.get();
+            }
+            return null;
+          }
+        });
+  }
+
   /**
    * Generate a MetricsConfig for all columns based on overrides, schema, and 
sort order.
    *
@@ -123,23 +217,25 @@ public final class MetricsConfig implements Serializable {
     // Handle user override of default mode
     MetricsMode defaultMode;
     String configuredDefault = props.get(DEFAULT_WRITE_METRICS_MODE);
+
     if (configuredDefault != null) {
       // a user-configured default mode is applied for all columns
       defaultMode = parseMode(configuredDefault, DEFAULT_MODE, "default");
-
-    } else if (schema == null || schema.columns().size() <= 
maxInferredDefaultColumns) {
-      // there are less than the inferred limit, so the default is used 
everywhere
+    } else if (schema == null) {
       defaultMode = DEFAULT_MODE;
-
     } else {
-      // an inferred default mode is applied to the first few columns, up to 
the limit
-      Schema subSchema = new Schema(schema.columns().subList(0, 
maxInferredDefaultColumns));
-      for (Integer id : TypeUtil.getProjectedIds(subSchema)) {
-        columnModes.put(subSchema.findColumnName(id), DEFAULT_MODE);
-      }
+      if (TypeUtil.getProjectedIds(schema).size() <= 
maxInferredDefaultColumns) {
+        // there are less than the inferred limit (including structs), so the 
default is used
+        // everywhere
+        defaultMode = DEFAULT_MODE;
+      } else {
+        for (Integer id : limitFieldIds(schema, maxInferredDefaultColumns)) {
+          columnModes.put(schema.findColumnName(id), DEFAULT_MODE);
+        }
 
-      // all other columns don't use metrics
-      defaultMode = MetricsModes.None.get();
+        // all other columns don't use metrics
+        defaultMode = MetricsModes.None.get();
+      }
     }
 
     // First set sorted column with sorted column default (can be overridden 
by user)
diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsConfig.java 
b/core/src/test/java/org/apache/iceberg/TestMetricsConfig.java
new file mode 100644
index 0000000000..2d5d7d1a1f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestMetricsConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class TestMetricsConfig {
+
+  @Test
+  public void testNestedStructsRespectedInLimit() {
+    Schema schema =
+        new Schema(
+            required(
+                1,
+                "col_struct",
+                Types.StructType.of(
+                    required(2, "a", Types.IntegerType.get()),
+                    required(3, "b", Types.IntegerType.get()))),
+            required(4, "top", Types.IntegerType.get()));
+
+    assertThat(MetricsConfig.limitFieldIds(schema, 1)).isEqualTo(Set.of(4));
+  }
+
+  @Test
+  public void testNestedMap() {
+    Schema schema =
+        new Schema(
+            required(
+                1,
+                "map",
+                Types.MapType.ofRequired(2, 3, Types.IntegerType.get(), 
Types.IntegerType.get())),
+            required(4, "top", Types.IntegerType.get()));
+
+    assertThat(MetricsConfig.limitFieldIds(schema, 2)).isEqualTo(Set.of(4, 2));
+  }
+
+  @Test
+  public void testNestedListOfMaps() {
+    Schema schema =
+        new Schema(
+            required(
+                1,
+                "array_of_maps",
+                Types.ListType.ofRequired(
+                    2,
+                    Types.MapType.ofRequired(
+                        3, 4, Types.IntegerType.get(), 
Types.IntegerType.get()))),
+            required(5, "top", Types.IntegerType.get()));
+
+    assertThat(MetricsConfig.limitFieldIds(schema, 2)).isEqualTo(Set.of(5, 3));
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java 
b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
index 8f9670d9fc..1d03e342df 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
@@ -22,12 +22,14 @@ import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.iceberg.MetricsModes.Counts;
 import org.apache.iceberg.MetricsModes.Full;
 import org.apache.iceberg.MetricsModes.None;
@@ -196,4 +198,75 @@ public class TestMetricsModes {
     assertThat(config.columnMode("col2")).isEqualTo(Truncate.withLength(16));
     assertThat(config.columnMode("col3")).isEqualTo(None.get());
   }
+
+  @TestTemplate
+  public void testMetricsVariantSupported() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    Schema schema =
+        new Schema(
+            required(1, "variant", Types.VariantType.get()),
+            required(2, "int", Types.IntegerType.get()));
+
+    Table table =
+        TestTables.create(
+            tableDir,
+            "test",
+            schema,
+            PartitionSpec.unpartitioned(),
+            SortOrder.unsorted(),
+            formatVersion);
+
+    // only infer a default for the first column
+    table
+        .updateProperties()
+        .set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "1")
+        .commit();
+
+    MetricsConfig config = MetricsConfig.forTable(table);
+
+    Map<String, MetricsModes.MetricsMode> metricModes =
+        schema.idToName().values().stream().collect(Collectors.toMap(k -> k, 
config::columnMode));
+
+    assertThat(metricModes)
+        .containsOnly(Map.entry("variant", Truncate.withLength(16)), 
Map.entry("int", None.get()));
+  }
+
+  @TestTemplate
+  public void testMetricsConfigNestedTypesStructs() {
+    Schema schema =
+        new Schema(
+            required(
+                5,
+                "col_struct",
+                Types.StructType.of(
+                    required(33, "a", Types.IntegerType.get()),
+                    required(1, "b", Types.IntegerType.get()))),
+            required(4, "top", Types.IntegerType.get()));
+
+    Table table =
+        TestTables.create(
+            tableDir,
+            "test",
+            schema,
+            PartitionSpec.unpartitioned(),
+            SortOrder.unsorted(),
+            formatVersion);
+
+    // only infer a default for the first two columns
+    table
+        .updateProperties()
+        .set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "2")
+        .commit();
+
+    MetricsConfig config = MetricsConfig.forTable(table);
+
+    Map<String, MetricsModes.MetricsMode> metricModes =
+        schema.idToName().values().stream().collect(Collectors.toMap(k -> k, 
config::columnMode));
+
+    assertThat(metricModes).containsOnlyKeys("col_struct.a", "col_struct", 
"col_struct.b", "top");
+
+    assertThat(metricModes).containsEntry("col_struct.a", 
Truncate.withLength(16));
+    assertThat(metricModes).containsEntry("col_struct.b", None.get());
+    assertThat(metricModes).containsEntry("top", Truncate.withLength(16));
+  }
 }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java 
b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 7617f7fc49..5665fd0171 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -297,6 +297,43 @@ public abstract class TestWriterMetrics<T> {
     }
   }
 
+  @TestTemplate
+  public void testMaxColumnsBounded() throws IOException {
+    List<Types.NestedField> fields = Arrays.asList(ID_FIELD, DATA_FIELD, 
STRUCT_FIELD);
+
+    Schema maxColSchema = new Schema(fields);
+
+    Table maxColumnTable =
+        TestTables.create(
+            tableDir,
+            "max_col_table",
+            maxColSchema,
+            PartitionSpec.unpartitioned(),
+            SortOrder.unsorted(),
+            FORMAT_V2);
+
+    long maxInferredColumns = 3;
+
+    maxColumnTable
+        .updateProperties()
+        .set(
+            TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS,
+            String.valueOf(maxInferredColumns))
+        .commit();
+
+    OutputFileFactory maxColFactory =
+        OutputFileFactory.builderFor(maxColumnTable, 1, 
1).format(fileFormat).build();
+
+    T row = toRow(1, "data", false, Long.MAX_VALUE);
+    DataWriter<T> dataWriter =
+        newWriterFactory(maxColumnTable)
+            .newDataWriter(maxColFactory.newOutputFile(), 
PartitionSpec.unpartitioned(), null);
+    dataWriter.write(row);
+    dataWriter.close();
+    DataFile dataFile = dataWriter.toDataFile();
+    
assertThat(dataFile.upperBounds().keySet().size()).isEqualTo(maxInferredColumns);
+  }
+
   @TestTemplate
   public void testMaxColumnsWithDefaultOverride() throws IOException {
     int numColumns = 
TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT + 1;

Reply via email to