This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9923ac9384 Spark 3.4: Support Spark Column Stats (#11532)
9923ac9384 is described below

commit 9923ac9384900f9644efa4118ef953095f15d0fd
Author: Sai Tharun <[email protected]>
AuthorDate: Thu Nov 14 14:57:19 2024 +0530

    Spark 3.4: Support Spark Column Stats (#11532)
---
 .../org/apache/iceberg/spark/SparkReadConf.java    |   8 +
 .../apache/iceberg/spark/SparkSQLProperties.java   |   4 +
 .../iceberg/spark/source/SparkChangelogScan.java   |   2 +-
 .../spark/source/SparkColumnStatistics.java        |  88 ++++++
 .../org/apache/iceberg/spark/source/SparkScan.java |  60 +++-
 .../org/apache/iceberg/spark/source/Stats.java     |  12 +-
 .../apache/iceberg/spark/source/TestSparkScan.java | 305 +++++++++++++++++++++
 .../apache/iceberg/spark/source/TestSparkScan.java |   2 +-
 8 files changed, 475 insertions(+), 6 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index f0c6b9115c..fdc9347bc3 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -351,4 +351,12 @@ public class SparkReadConf {
         
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT)
         .parse();
   }
+
+  public boolean reportColumnStats() {
+    return confParser
+        .booleanConf()
+        .sessionConf(SparkSQLProperties.REPORT_COLUMN_STATS)
+        .defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
+        .parse();
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index fde8dd29f0..1e8c732d2d 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -97,4 +97,8 @@ public class SparkSQLProperties {
   public static final String EXECUTOR_CACHE_LOCALITY_ENABLED =
       "spark.sql.iceberg.executor-cache.locality.enabled";
   public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false;
+
+  // Controls whether to report available column statistics to Spark for query 
optimization.
+  public static final String REPORT_COLUMN_STATS = 
"spark.sql.iceberg.report-column-stats";
+  public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 7cde3e1fbe..71b53d7026 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -88,7 +88,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
   public Statistics estimateStatistics() {
     long rowsCount = 
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
     long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
-    return new Stats(sizeInBytes, rowsCount);
+    return new Stats(sizeInBytes, rowsCount, Collections.emptyMap());
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
new file mode 100644
index 0000000000..faaff3631d
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
+import org.apache.spark.sql.connector.read.colstats.Histogram;
+
+class SparkColumnStatistics implements ColumnStatistics {
+
+  private final OptionalLong distinctCount;
+  private final Optional<Object> min;
+  private final Optional<Object> max;
+  private final OptionalLong nullCount;
+  private final OptionalLong avgLen;
+  private final OptionalLong maxLen;
+  private final Optional<Histogram> histogram;
+
+  SparkColumnStatistics(
+      Long distinctCount,
+      Object min,
+      Object max,
+      Long nullCount,
+      Long avgLen,
+      Long maxLen,
+      Histogram histogram) {
+    this.distinctCount =
+        (distinctCount == null) ? OptionalLong.empty() : 
OptionalLong.of(distinctCount);
+    this.min = Optional.ofNullable(min);
+    this.max = Optional.ofNullable(max);
+    this.nullCount = (nullCount == null) ? OptionalLong.empty() : 
OptionalLong.of(nullCount);
+    this.avgLen = (avgLen == null) ? OptionalLong.empty() : 
OptionalLong.of(avgLen);
+    this.maxLen = (maxLen == null) ? OptionalLong.empty() : 
OptionalLong.of(maxLen);
+    this.histogram = Optional.ofNullable(histogram);
+  }
+
+  @Override
+  public OptionalLong distinctCount() {
+    return distinctCount;
+  }
+
+  @Override
+  public Optional<Object> min() {
+    return min;
+  }
+
+  @Override
+  public Optional<Object> max() {
+    return max;
+  }
+
+  @Override
+  public OptionalLong nullCount() {
+    return nullCount;
+  }
+
+  @Override
+  public OptionalLong avgLen() {
+    return avgLen;
+  }
+
+  @Override
+  public OptionalLong maxLen() {
+    return maxLen;
+  }
+
+  @Override
+  public Optional<Histogram> histogram() {
+    return histogram;
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 6efe8a080b..019f3919dc 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -23,15 +23,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.iceberg.BlobMetadata;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -75,22 +79,28 @@ import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.metric.CustomMetric;
 import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class SparkScan implements Scan, SupportsReportStatistics {
   private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
+  private static final String NDV_KEY = "ndv";
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final SparkSession spark;
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
   private final Schema expectedSchema;
@@ -111,6 +121,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
     SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, 
expectedSchema);
 
+    this.spark = spark;
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.readConf = readConf;
@@ -175,7 +186,49 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   protected Statistics estimateStatistics(Snapshot snapshot) {
     // its a fresh table, no data
     if (snapshot == null) {
-      return new Stats(0L, 0L);
+      return new Stats(0L, 0L, Collections.emptyMap());
+    }
+
+    boolean cboEnabled =
+        Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), 
"false"));
+    Map<NamedReference, ColumnStatistics> colStatsMap = Collections.emptyMap();
+    if (readConf.reportColumnStats() && cboEnabled) {
+      colStatsMap = Maps.newHashMap();
+      List<StatisticsFile> files = table.statisticsFiles();
+      if (!files.isEmpty()) {
+        List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+        Map<Integer, List<BlobMetadata>> groupedByField =
+            metadataList.stream()
+                .collect(
+                    Collectors.groupingBy(
+                        metadata -> metadata.fields().get(0), 
Collectors.toList()));
+
+        for (Map.Entry<Integer, List<BlobMetadata>> entry : 
groupedByField.entrySet()) {
+          String colName = table.schema().findColumnName(entry.getKey());
+          NamedReference ref = FieldReference.column(colName);
+          Long ndv = null;
+
+          for (BlobMetadata blobMetadata : entry.getValue()) {
+            if (blobMetadata
+                .type()
+                
.equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1))
 {
+              String ndvStr = blobMetadata.properties().get(NDV_KEY);
+              if (!Strings.isNullOrEmpty(ndvStr)) {
+                ndv = Long.parseLong(ndvStr);
+              } else {
+                LOG.debug("{} is not set in BlobMetadata for column {}", 
NDV_KEY, colName);
+              }
+            } else {
+              LOG.debug("Blob type {} is not supported yet", 
blobMetadata.type());
+            }
+          }
+          ColumnStatistics colStats =
+              new SparkColumnStatistics(ndv, null, null, null, null, null, 
null);
+
+          colStatsMap.put(ref, colStats);
+        }
+      }
     }
 
     // estimate stats using snapshot summary only for partitioned tables
@@ -186,12 +239,13 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
           snapshot.snapshotId(),
           table.name());
       long totalRecords = totalRecords(snapshot);
-      return new Stats(SparkSchemaUtil.estimateSize(readSchema(), 
totalRecords), totalRecords);
+      return new Stats(
+          SparkSchemaUtil.estimateSize(readSchema(), totalRecords), 
totalRecords, colStatsMap);
     }
 
     long rowsCount = 
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
     long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
-    return new Stats(sizeInBytes, rowsCount);
+    return new Stats(sizeInBytes, rowsCount, colStatsMap);
   }
 
   private long totalRecords(Snapshot snapshot) {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
index ddf6ca834d..ccf523cb4b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
@@ -18,16 +18,21 @@
  */
 package org.apache.iceberg.spark.source;
 
+import java.util.Map;
 import java.util.OptionalLong;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
 
 class Stats implements Statistics {
   private final OptionalLong sizeInBytes;
   private final OptionalLong numRows;
+  private final Map<NamedReference, ColumnStatistics> colstats;
 
-  Stats(long sizeInBytes, long numRows) {
+  Stats(long sizeInBytes, long numRows, Map<NamedReference, ColumnStatistics> 
colstats) {
     this.sizeInBytes = OptionalLong.of(sizeInBytes);
     this.numRows = OptionalLong.of(numRows);
+    this.colstats = colstats;
   }
 
   @Override
@@ -39,4 +44,9 @@ class Stats implements Statistics {
   public OptionalLong numRows() {
     return numRows;
   }
+
+  @Override
+  public Map<NamedReference, ColumnStatistics> columnStats() {
+    return colstats;
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index bc03fa0429..45f5e925ca 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static 
org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1;
 import static 
org.apache.iceberg.spark.SystemFunctionPushDownHelper.createPartitionedTable;
 import static 
org.apache.iceberg.spark.SystemFunctionPushDownHelper.createUnpartitionedTable;
 import static 
org.apache.iceberg.spark.SystemFunctionPushDownHelper.timestampStrToDayOrdinal;
@@ -28,10 +29,18 @@ import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
 import org.apache.iceberg.spark.functions.BucketFunction;
 import org.apache.iceberg.spark.functions.DaysFunction;
@@ -40,6 +49,7 @@ import org.apache.iceberg.spark.functions.MonthsFunction;
 import org.apache.iceberg.spark.functions.TruncateFunction;
 import org.apache.iceberg.spark.functions.YearsFunction;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
@@ -56,6 +66,8 @@ import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.junit.After;
@@ -68,6 +80,8 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class TestSparkScan extends SparkTestBaseWithCatalog {
 
+  private static final String DUMMY_BLOB_TYPE = "sum-data-size-bytes-v1";
+
   private final String format;
 
   @Parameterized.Parameters(name = "format = {0}")
@@ -112,6 +126,271 @@ public class TestSparkScan extends 
SparkTestBaseWithCatalog {
     Assert.assertEquals(10000L, stats.numRows().getAsLong());
   }
 
+  @Test
+  public void testTableWithoutColStats() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+    // The expected col NDVs are nulls
+    withSQLConf(
+        reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, 
Maps.newHashMap()));
+  }
+
+  @Test
+  public void testTableWithoutApacheDatasketchColStat() throws 
NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId,
+            "/test/statistics/file.puffin",
+            100,
+            42,
+            ImmutableList.of(
+                new GenericBlobMetadata(
+                    DUMMY_BLOB_TYPE,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("data_size", "4"))));
+
+    table.updateStatistics().setStatistics(snapshotId, 
statisticsFile).commit();
+
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+    // The expected col NDVs are nulls
+    withSQLConf(
+        reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, 
Maps.newHashMap()));
+  }
+
+  @Test
+  public void testTableWithOneColStats() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId,
+            "/test/statistics/file.puffin",
+            100,
+            42,
+            ImmutableList.of(
+                new GenericBlobMetadata(
+                    APACHE_DATASKETCHES_THETA_V1,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("ndv", "4"))));
+
+    table.updateStatistics().setStatistics(snapshotId, 
statisticsFile).commit();
+
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+
+    Map<String, Long> expectedOneNDV = Maps.newHashMap();
+    expectedOneNDV.put("id", 4L);
+    withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 
4L, expectedOneNDV));
+  }
+
+  @Test
+  public void testTableWithOneApacheDatasketchColStatAndOneDifferentColStat()
+      throws NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId,
+            "/test/statistics/file.puffin",
+            100,
+            42,
+            ImmutableList.of(
+                new GenericBlobMetadata(
+                    APACHE_DATASKETCHES_THETA_V1,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("ndv", "4")),
+                new GenericBlobMetadata(
+                    DUMMY_BLOB_TYPE,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("data_size", "2"))));
+
+    table.updateStatistics().setStatistics(snapshotId, 
statisticsFile).commit();
+
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+
+    Map<String, Long> expectedOneNDV = Maps.newHashMap();
+    expectedOneNDV.put("id", 4L);
+    withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 
4L, expectedOneNDV));
+  }
+
+  @Test
+  public void testTableWithTwoColStats() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId,
+            "/test/statistics/file.puffin",
+            100,
+            42,
+            ImmutableList.of(
+                new GenericBlobMetadata(
+                    APACHE_DATASKETCHES_THETA_V1,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("ndv", "4")),
+                new GenericBlobMetadata(
+                    APACHE_DATASKETCHES_THETA_V1,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(2),
+                    ImmutableMap.of("ndv", "2"))));
+
+    table.updateStatistics().setStatistics(snapshotId, 
statisticsFile).commit();
+
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+
+    Map<String, Long> expectedTwoNDVs = Maps.newHashMap();
+    expectedTwoNDVs.put("id", 4L);
+    expectedTwoNDVs.put("data", 2L);
+    withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 
4L, expectedTwoNDVs));
+  }
+
   @Test
   public void testUnpartitionedYears() throws Exception {
     createUnpartitionedTable(spark, tableName);
@@ -716,6 +995,32 @@ public class TestSparkScan extends 
SparkTestBaseWithCatalog {
     return expressions;
   }
 
+  private void checkColStatisticsNotReported(SparkScan scan, long 
expectedRowCount) {
+    Statistics stats = scan.estimateStatistics();
+    assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
+
+    Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
+    assertThat(columnStats).isEmpty();
+  }
+
+  private void checkColStatisticsReported(
+      SparkScan scan, long expectedRowCount, Map<String, Long> expectedNDVs) {
+    Statistics stats = scan.estimateStatistics();
+    assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
+
+    Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
+    if (expectedNDVs.isEmpty()) {
+      assertThat(columnStats.values().stream().allMatch(value -> 
value.distinctCount().isEmpty()))
+          .isTrue();
+    } else {
+      for (Map.Entry<String, Long> entry : expectedNDVs.entrySet()) {
+        assertThat(
+                
columnStats.get(FieldReference.column(entry.getKey())).distinctCount().getAsLong())
+            .isEqualTo(entry.getValue());
+      }
+    }
+  }
+
   private static LiteralValue<Integer> intLit(int value) {
     return LiteralValue.apply(value, DataTypes.IntegerType);
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index af144fe4bf..dbb15ca5a7 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -1018,7 +1018,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
 
     Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
-    assertThat(columnStats.isEmpty());
+    assertThat(columnStats).isEmpty();
   }
 
   private void checkColStatisticsReported(

Reply via email to