This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9923ac9384 Spark 3.4: Support Spark Column Stats (#11532)
9923ac9384 is described below
commit 9923ac9384900f9644efa4118ef953095f15d0fd
Author: Sai Tharun <[email protected]>
AuthorDate: Thu Nov 14 14:57:19 2024 +0530
Spark 3.4: Support Spark Column Stats (#11532)
---
.../org/apache/iceberg/spark/SparkReadConf.java | 8 +
.../apache/iceberg/spark/SparkSQLProperties.java | 4 +
.../iceberg/spark/source/SparkChangelogScan.java | 2 +-
.../spark/source/SparkColumnStatistics.java | 88 ++++++
.../org/apache/iceberg/spark/source/SparkScan.java | 60 +++-
.../org/apache/iceberg/spark/source/Stats.java | 12 +-
.../apache/iceberg/spark/source/TestSparkScan.java | 305 +++++++++++++++++++++
.../apache/iceberg/spark/source/TestSparkScan.java | 2 +-
8 files changed, 475 insertions(+), 6 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index f0c6b9115c..fdc9347bc3 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -351,4 +351,12 @@ public class SparkReadConf {
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT)
.parse();
}
+
+ public boolean reportColumnStats() {
+ return confParser
+ .booleanConf()
+ .sessionConf(SparkSQLProperties.REPORT_COLUMN_STATS)
+ .defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
+ .parse();
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index fde8dd29f0..1e8c732d2d 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -97,4 +97,8 @@ public class SparkSQLProperties {
public static final String EXECUTOR_CACHE_LOCALITY_ENABLED =
"spark.sql.iceberg.executor-cache.locality.enabled";
public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false;
+
+ // Controls whether to report available column statistics to Spark for query
optimization.
+ public static final String REPORT_COLUMN_STATS =
"spark.sql.iceberg.report-column-stats";
+ public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 7cde3e1fbe..71b53d7026 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -88,7 +88,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
public Statistics estimateStatistics() {
long rowsCount =
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
- return new Stats(sizeInBytes, rowsCount);
+ return new Stats(sizeInBytes, rowsCount, Collections.emptyMap());
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
new file mode 100644
index 0000000000..faaff3631d
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
+import org.apache.spark.sql.connector.read.colstats.Histogram;
+
+class SparkColumnStatistics implements ColumnStatistics {
+
+ private final OptionalLong distinctCount;
+ private final Optional<Object> min;
+ private final Optional<Object> max;
+ private final OptionalLong nullCount;
+ private final OptionalLong avgLen;
+ private final OptionalLong maxLen;
+ private final Optional<Histogram> histogram;
+
+ SparkColumnStatistics(
+ Long distinctCount,
+ Object min,
+ Object max,
+ Long nullCount,
+ Long avgLen,
+ Long maxLen,
+ Histogram histogram) {
+ this.distinctCount =
+ (distinctCount == null) ? OptionalLong.empty() :
OptionalLong.of(distinctCount);
+ this.min = Optional.ofNullable(min);
+ this.max = Optional.ofNullable(max);
+ this.nullCount = (nullCount == null) ? OptionalLong.empty() :
OptionalLong.of(nullCount);
+ this.avgLen = (avgLen == null) ? OptionalLong.empty() :
OptionalLong.of(avgLen);
+ this.maxLen = (maxLen == null) ? OptionalLong.empty() :
OptionalLong.of(maxLen);
+ this.histogram = Optional.ofNullable(histogram);
+ }
+
+ @Override
+ public OptionalLong distinctCount() {
+ return distinctCount;
+ }
+
+ @Override
+ public Optional<Object> min() {
+ return min;
+ }
+
+ @Override
+ public Optional<Object> max() {
+ return max;
+ }
+
+ @Override
+ public OptionalLong nullCount() {
+ return nullCount;
+ }
+
+ @Override
+ public OptionalLong avgLen() {
+ return avgLen;
+ }
+
+ @Override
+ public OptionalLong maxLen() {
+ return maxLen;
+ }
+
+ @Override
+ public Optional<Histogram> histogram() {
+ return histogram;
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 6efe8a080b..019f3919dc 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -23,15 +23,19 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -75,22 +79,28 @@ import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class SparkScan implements Scan, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
+ private static final String NDV_KEY = "ndv";
private final JavaSparkContext sparkContext;
private final Table table;
+ private final SparkSession spark;
private final SparkReadConf readConf;
private final boolean caseSensitive;
private final Schema expectedSchema;
@@ -111,6 +121,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema,
expectedSchema);
+ this.spark = spark;
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
@@ -175,7 +186,49 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
protected Statistics estimateStatistics(Snapshot snapshot) {
// its a fresh table, no data
if (snapshot == null) {
- return new Stats(0L, 0L);
+ return new Stats(0L, 0L, Collections.emptyMap());
+ }
+
+ boolean cboEnabled =
+ Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(),
"false"));
+ Map<NamedReference, ColumnStatistics> colStatsMap = Collections.emptyMap();
+ if (readConf.reportColumnStats() && cboEnabled) {
+ colStatsMap = Maps.newHashMap();
+ List<StatisticsFile> files = table.statisticsFiles();
+ if (!files.isEmpty()) {
+ List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+ Map<Integer, List<BlobMetadata>> groupedByField =
+ metadataList.stream()
+ .collect(
+ Collectors.groupingBy(
+ metadata -> metadata.fields().get(0),
Collectors.toList()));
+
+ for (Map.Entry<Integer, List<BlobMetadata>> entry :
groupedByField.entrySet()) {
+ String colName = table.schema().findColumnName(entry.getKey());
+ NamedReference ref = FieldReference.column(colName);
+ Long ndv = null;
+
+ for (BlobMetadata blobMetadata : entry.getValue()) {
+ if (blobMetadata
+ .type()
+
.equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1))
{
+ String ndvStr = blobMetadata.properties().get(NDV_KEY);
+ if (!Strings.isNullOrEmpty(ndvStr)) {
+ ndv = Long.parseLong(ndvStr);
+ } else {
+ LOG.debug("{} is not set in BlobMetadata for column {}",
NDV_KEY, colName);
+ }
+ } else {
+ LOG.debug("Blob type {} is not supported yet",
blobMetadata.type());
+ }
+ }
+ ColumnStatistics colStats =
+ new SparkColumnStatistics(ndv, null, null, null, null, null,
null);
+
+ colStatsMap.put(ref, colStats);
+ }
+ }
}
// estimate stats using snapshot summary only for partitioned tables
@@ -186,12 +239,13 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
snapshot.snapshotId(),
table.name());
long totalRecords = totalRecords(snapshot);
- return new Stats(SparkSchemaUtil.estimateSize(readSchema(),
totalRecords), totalRecords);
+ return new Stats(
+ SparkSchemaUtil.estimateSize(readSchema(), totalRecords),
totalRecords, colStatsMap);
}
long rowsCount =
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
- return new Stats(sizeInBytes, rowsCount);
+ return new Stats(sizeInBytes, rowsCount, colStatsMap);
}
private long totalRecords(Snapshot snapshot) {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
index ddf6ca834d..ccf523cb4b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
@@ -18,16 +18,21 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.Map;
import java.util.OptionalLong;
+import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
class Stats implements Statistics {
private final OptionalLong sizeInBytes;
private final OptionalLong numRows;
+ private final Map<NamedReference, ColumnStatistics> colstats;
- Stats(long sizeInBytes, long numRows) {
+ Stats(long sizeInBytes, long numRows, Map<NamedReference, ColumnStatistics>
colstats) {
this.sizeInBytes = OptionalLong.of(sizeInBytes);
this.numRows = OptionalLong.of(numRows);
+ this.colstats = colstats;
}
@Override
@@ -39,4 +44,9 @@ class Stats implements Statistics {
public OptionalLong numRows() {
return numRows;
}
+
+ @Override
+ public Map<NamedReference, ColumnStatistics> columnStats() {
+ return colstats;
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index bc03fa0429..45f5e925ca 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;
+import static
org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1;
import static
org.apache.iceberg.spark.SystemFunctionPushDownHelper.createPartitionedTable;
import static
org.apache.iceberg.spark.SystemFunctionPushDownHelper.createUnpartitionedTable;
import static
org.apache.iceberg.spark.SystemFunctionPushDownHelper.timestampStrToDayOrdinal;
@@ -28,10 +29,18 @@ import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.functions.BucketFunction;
import org.apache.iceberg.spark.functions.DaysFunction;
@@ -40,6 +49,7 @@ import org.apache.iceberg.spark.functions.MonthsFunction;
import org.apache.iceberg.spark.functions.TruncateFunction;
import org.apache.iceberg.spark.functions.YearsFunction;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
@@ -56,6 +66,8 @@ import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.After;
@@ -68,6 +80,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestSparkScan extends SparkTestBaseWithCatalog {
+ private static final String DUMMY_BLOB_TYPE = "sum-data-size-bytes-v1";
+
private final String format;
@Parameterized.Parameters(name = "format = {0}")
@@ -112,6 +126,271 @@ public class TestSparkScan extends
SparkTestBaseWithCatalog {
Assert.assertEquals(10000L, stats.numRows().getAsLong());
}
+ @Test
+ public void testTableWithoutColStats() throws NoSuchTableException {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SparkScanBuilder scanBuilder =
+ new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+ SparkScan scan = (SparkScan) scanBuilder.build();
+
+ Map<String, String> reportColStatsDisabled =
+ ImmutableMap.of(
+ SQLConf.CBO_ENABLED().key(), "true",
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+ Map<String, String> reportColStatsEnabled =
+ ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+ checkColStatisticsNotReported(scan, 4L);
+ withSQLConf(reportColStatsDisabled, () ->
checkColStatisticsNotReported(scan, 4L));
+ // The expected col NDVs are nulls
+ withSQLConf(
+ reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L,
Maps.newHashMap()));
+ }
+
+ @Test
+ public void testTableWithoutApacheDatasketchColStat() throws
NoSuchTableException {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ SparkScanBuilder scanBuilder =
+ new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+ SparkScan scan = (SparkScan) scanBuilder.build();
+
+ Map<String, String> reportColStatsDisabled =
+ ImmutableMap.of(
+ SQLConf.CBO_ENABLED().key(), "true",
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+ Map<String, String> reportColStatsEnabled =
+ ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId,
+ "/test/statistics/file.puffin",
+ 100,
+ 42,
+ ImmutableList.of(
+ new GenericBlobMetadata(
+ DUMMY_BLOB_TYPE,
+ snapshotId,
+ 1,
+ ImmutableList.of(1),
+ ImmutableMap.of("data_size", "4"))));
+
+ table.updateStatistics().setStatistics(snapshotId,
statisticsFile).commit();
+
+ checkColStatisticsNotReported(scan, 4L);
+ withSQLConf(reportColStatsDisabled, () ->
checkColStatisticsNotReported(scan, 4L));
+ // The expected col NDVs are nulls
+ withSQLConf(
+ reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L,
Maps.newHashMap()));
+ }
+
+ @Test
+ public void testTableWithOneColStats() throws NoSuchTableException {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ SparkScanBuilder scanBuilder =
+ new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+ SparkScan scan = (SparkScan) scanBuilder.build();
+
+ Map<String, String> reportColStatsDisabled =
+ ImmutableMap.of(
+ SQLConf.CBO_ENABLED().key(), "true",
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+ Map<String, String> reportColStatsEnabled =
+ ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId,
+ "/test/statistics/file.puffin",
+ 100,
+ 42,
+ ImmutableList.of(
+ new GenericBlobMetadata(
+ APACHE_DATASKETCHES_THETA_V1,
+ snapshotId,
+ 1,
+ ImmutableList.of(1),
+ ImmutableMap.of("ndv", "4"))));
+
+ table.updateStatistics().setStatistics(snapshotId,
statisticsFile).commit();
+
+ checkColStatisticsNotReported(scan, 4L);
+ withSQLConf(reportColStatsDisabled, () ->
checkColStatisticsNotReported(scan, 4L));
+
+ Map<String, Long> expectedOneNDV = Maps.newHashMap();
+ expectedOneNDV.put("id", 4L);
+ withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan,
4L, expectedOneNDV));
+ }
+
+ @Test
+ public void testTableWithOneApacheDatasketchColStatAndOneDifferentColStat()
+ throws NoSuchTableException {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ SparkScanBuilder scanBuilder =
+ new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+ SparkScan scan = (SparkScan) scanBuilder.build();
+
+ Map<String, String> reportColStatsDisabled =
+ ImmutableMap.of(
+ SQLConf.CBO_ENABLED().key(), "true",
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+ Map<String, String> reportColStatsEnabled =
+ ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId,
+ "/test/statistics/file.puffin",
+ 100,
+ 42,
+ ImmutableList.of(
+ new GenericBlobMetadata(
+ APACHE_DATASKETCHES_THETA_V1,
+ snapshotId,
+ 1,
+ ImmutableList.of(1),
+ ImmutableMap.of("ndv", "4")),
+ new GenericBlobMetadata(
+ DUMMY_BLOB_TYPE,
+ snapshotId,
+ 1,
+ ImmutableList.of(1),
+ ImmutableMap.of("data_size", "2"))));
+
+ table.updateStatistics().setStatistics(snapshotId,
statisticsFile).commit();
+
+ checkColStatisticsNotReported(scan, 4L);
+ withSQLConf(reportColStatsDisabled, () ->
checkColStatisticsNotReported(scan, 4L));
+
+ Map<String, Long> expectedOneNDV = Maps.newHashMap();
+ expectedOneNDV.put("id", 4L);
+ withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan,
4L, expectedOneNDV));
+ }
+
+ @Test
+ public void testTableWithTwoColStats() throws NoSuchTableException {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+ List<SimpleRecord> records =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ spark
+ .createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ SparkScanBuilder scanBuilder =
+ new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+ SparkScan scan = (SparkScan) scanBuilder.build();
+
+ Map<String, String> reportColStatsDisabled =
+ ImmutableMap.of(
+ SQLConf.CBO_ENABLED().key(), "true",
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+ Map<String, String> reportColStatsEnabled =
+ ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId,
+ "/test/statistics/file.puffin",
+ 100,
+ 42,
+ ImmutableList.of(
+ new GenericBlobMetadata(
+ APACHE_DATASKETCHES_THETA_V1,
+ snapshotId,
+ 1,
+ ImmutableList.of(1),
+ ImmutableMap.of("ndv", "4")),
+ new GenericBlobMetadata(
+ APACHE_DATASKETCHES_THETA_V1,
+ snapshotId,
+ 1,
+ ImmutableList.of(2),
+ ImmutableMap.of("ndv", "2"))));
+
+ table.updateStatistics().setStatistics(snapshotId,
statisticsFile).commit();
+
+ checkColStatisticsNotReported(scan, 4L);
+ withSQLConf(reportColStatsDisabled, () ->
checkColStatisticsNotReported(scan, 4L));
+
+ Map<String, Long> expectedTwoNDVs = Maps.newHashMap();
+ expectedTwoNDVs.put("id", 4L);
+ expectedTwoNDVs.put("data", 2L);
+ withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan,
4L, expectedTwoNDVs));
+ }
+
@Test
public void testUnpartitionedYears() throws Exception {
createUnpartitionedTable(spark, tableName);
@@ -716,6 +995,32 @@ public class TestSparkScan extends
SparkTestBaseWithCatalog {
return expressions;
}
+ private void checkColStatisticsNotReported(SparkScan scan, long
expectedRowCount) {
+ Statistics stats = scan.estimateStatistics();
+ assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
+
+ Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
+ assertThat(columnStats).isEmpty();
+ }
+
+ private void checkColStatisticsReported(
+ SparkScan scan, long expectedRowCount, Map<String, Long> expectedNDVs) {
+ Statistics stats = scan.estimateStatistics();
+ assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
+
+ Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
+ if (expectedNDVs.isEmpty()) {
+ assertThat(columnStats.values().stream().allMatch(value ->
value.distinctCount().isEmpty()))
+ .isTrue();
+ } else {
+ for (Map.Entry<String, Long> entry : expectedNDVs.entrySet()) {
+ assertThat(
+
columnStats.get(FieldReference.column(entry.getKey())).distinctCount().getAsLong())
+ .isEqualTo(entry.getValue());
+ }
+ }
+ }
+
private static LiteralValue<Integer> intLit(int value) {
return LiteralValue.apply(value, DataTypes.IntegerType);
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index af144fe4bf..dbb15ca5a7 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -1018,7 +1018,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount);
Map<NamedReference, ColumnStatistics> columnStats = stats.columnStats();
- assertThat(columnStats.isEmpty());
+ assertThat(columnStats).isEmpty();
}
private void checkColStatisticsReported(