This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 58b3e36731 Spark 4.0: Display write metrics on SQL UI (#15468)
58b3e36731 is described below
commit 58b3e3673107e9cb474213d3b66348ceb3b206d4
Author: Manu Zhang <[email protected]>
AuthorDate: Mon Mar 2 18:27:59 2026 +0800
Spark 4.0: Display write metrics on SQL UI (#15468)
Back-port of https://github.com/apache/iceberg/pull/15104
---
.../org/apache/iceberg/spark/SparkWriteUtil.java | 109 ++++++++++
.../spark/source/SparkPositionDeletesRewrite.java | 21 ++
.../spark/source/SparkPositionDeltaWrite.java | 21 ++
.../apache/iceberg/spark/source/SparkWrite.java | 21 ++
.../{SkippedDataFiles.java => AddedDataFiles.java} | 6 +-
...SkippedDataFiles.java => AddedDeleteFiles.java} | 6 +-
...eteFiles.java => AddedEqualityDeleteFiles.java} | 6 +-
...yDeleteFiles.java => AddedEqualityDeletes.java} | 6 +-
...yDeleteFiles.java => AddedFileSizeInBytes.java} | 6 +-
...eFiles.java => AddedPositionalDeleteFiles.java} | 6 +-
...eleteFiles.java => AddedPositionalDeletes.java} | 6 +-
.../{TotalDataManifests.java => AddedRecords.java} | 6 +-
.../spark/source/metrics/EqualityDeleteFiles.java | 2 +-
.../spark/source/metrics/IndexedDeleteFiles.java | 2 +-
.../source/metrics/PositionalDeleteFiles.java | 2 +-
...SkippedDataFiles.java => RemovedDataFiles.java} | 6 +-
...ityDeleteFiles.java => RemovedDeleteFiles.java} | 6 +-
...eFiles.java => RemovedEqualityDeleteFiles.java} | 6 +-
...eleteFiles.java => RemovedEqualityDeletes.java} | 6 +-
...eleteFiles.java => RemovedFileSizeInBytes.java} | 6 +-
...iles.java => RemovedPositionalDeleteFiles.java} | 6 +-
...eteFiles.java => RemovedPositionalDeletes.java} | 6 +-
.../{SkippedDataFiles.java => RemovedRecords.java} | 6 +-
.../spark/source/metrics/ResultDataFiles.java | 2 +-
.../spark/source/metrics/ResultDeleteFiles.java | 2 +-
.../spark/source/metrics/ScannedDataManifests.java | 2 +-
.../source/metrics/ScannedDeleteManifests.java | 2 +-
.../spark/source/metrics/SkippedDataFiles.java | 2 +-
.../spark/source/metrics/SkippedDataManifests.java | 2 +-
.../spark/source/metrics/SkippedDeleteFiles.java | 2 +-
.../source/metrics/SkippedDeleteManifests.java | 2 +-
.../spark/source/metrics/TotalDataFileSize.java | 2 +-
.../{SkippedDataFiles.java => TotalDataFiles.java} | 6 +-
.../spark/source/metrics/TotalDataManifests.java | 2 +-
.../spark/source/metrics/TotalDeleteFileSize.java | 2 +-
...SkippedDataFiles.java => TotalDeleteFiles.java} | 6 +-
.../spark/source/metrics/TotalDeleteManifests.java | 2 +-
...yDeleteFiles.java => TotalEqualityDeletes.java} | 6 +-
...DataFileSize.java => TotalFileSizeInBytes.java} | 4 +-
.../source/metrics/TotalPlanningDuration.java | 2 +-
...eleteFiles.java => TotalPositionalDeletes.java} | 6 +-
.../{TotalDataManifests.java => TotalRecords.java} | 6 +-
.../spark/source/TestSparkWriteMetrics.java | 223 +++++++++++++++++++++
43 files changed, 476 insertions(+), 81 deletions(-)
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
index 0d68a0d8cd..86902c15e1 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
@@ -23,10 +23,38 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MER
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import java.util.Arrays;
+import java.util.List;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Table;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.spark.source.metrics.AddedDataFiles;
+import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedRecords;
+import org.apache.iceberg.spark.source.metrics.RemovedDataFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedRecords;
+import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
+import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalRecords;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SortOrderUtil;
import org.apache.spark.sql.connector.distributions.Distribution;
@@ -36,6 +64,8 @@ import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
/**
@@ -256,4 +286,83 @@ public class SparkWriteUtil {
private static SortOrder sort(Expression expr) {
return Expressions.sort(expr, SortDirection.ASCENDING);
}
+
+ public static CustomMetric[] supportedCustomMetrics() {
+ return new CustomMetric[] {
+ new AddedDataFiles(),
+ new AddedDeleteFiles(),
+ new AddedEqualityDeletes(),
+ new AddedEqualityDeleteFiles(),
+ new AddedFileSizeInBytes(),
+ new AddedPositionalDeletes(),
+ new AddedPositionalDeleteFiles(),
+ new AddedRecords(),
+ new RemovedDataFiles(),
+ new RemovedDeleteFiles(),
+ new RemovedRecords(),
+ new RemovedEqualityDeleteFiles(),
+ new RemovedEqualityDeletes(),
+ new RemovedFileSizeInBytes(),
+ new RemovedPositionalDeleteFiles(),
+ new RemovedPositionalDeletes(),
+ new TotalDataFiles(),
+ new TotalDeleteFiles(),
+ new TotalEqualityDeletes(),
+ new TotalFileSizeInBytes(),
+ new TotalPositionalDeletes(),
+ new TotalRecords()
+ };
+ }
+
+ public static CustomTaskMetric[] customTaskMetrics(InMemoryMetricsReporter
metricsReporter) {
+ List<CustomTaskMetric> metrics = Lists.newArrayList();
+ if (metricsReporter != null) {
+ CommitReport commitReport = metricsReporter.commitReport();
+ if (commitReport != null) {
+ CommitMetricsResult result = commitReport.commitMetrics();
+ addValue(new AddedDataFiles(), result.addedDataFiles(), metrics);
+ addValue(new AddedDeleteFiles(), result.addedDeleteFiles(), metrics);
+ addValue(new AddedEqualityDeletes(), result.addedEqualityDeletes(),
metrics);
+ addValue(new AddedEqualityDeleteFiles(),
result.addedEqualityDeleteFiles(), metrics);
+ addValue(new AddedFileSizeInBytes(), result.addedFilesSizeInBytes(),
metrics);
+ addValue(new AddedPositionalDeletes(),
result.addedPositionalDeletes(), metrics);
+ addValue(new AddedPositionalDeleteFiles(),
result.addedPositionalDeleteFiles(), metrics);
+ addValue(new AddedRecords(), result.addedRecords(), metrics);
+ addValue(new RemovedDataFiles(), result.removedDataFiles(), metrics);
+ addValue(new RemovedDeleteFiles(), result.removedDeleteFiles(),
metrics);
+ addValue(new RemovedRecords(), result.removedRecords(), metrics);
+ addValue(new RemovedEqualityDeleteFiles(),
result.removedEqualityDeleteFiles(), metrics);
+ addValue(new RemovedEqualityDeletes(),
result.removedEqualityDeletes(), metrics);
+ addValue(new RemovedFileSizeInBytes(),
result.removedFilesSizeInBytes(), metrics);
+ addValue(
+ new RemovedPositionalDeleteFiles(),
result.removedPositionalDeleteFiles(), metrics);
+ addValue(new RemovedPositionalDeletes(),
result.removedPositionalDeletes(), metrics);
+ addValue(new TotalDataFiles(), result.totalDataFiles(), metrics);
+ addValue(new TotalDeleteFiles(), result.totalDeleteFiles(), metrics);
+ addValue(new TotalEqualityDeletes(), result.totalEqualityDeletes(),
metrics);
+ addValue(new TotalFileSizeInBytes(), result.totalFilesSizeInBytes(),
metrics);
+ addValue(new TotalPositionalDeletes(),
result.totalPositionalDeletes(), metrics);
+ addValue(new TotalRecords(), result.totalRecords(), metrics);
+ }
+ }
+ return metrics.toArray(new CustomTaskMetric[0]);
+ }
+
+ private static void addValue(
+ CustomMetric metric, CounterResult result, List<CustomTaskMetric>
taskMetrics) {
+ if (result != null) {
+ taskMetrics.add(
+ new CustomTaskMetric() {
+ @Override
+ public String name() {
+ return metric.name();
+ }
+
+ @Override
+ public long value() {
+ return result.value();
+ }
+ });
+ }
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 0ec7084bfd..3bbea7b5c9 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
@@ -37,16 +38,20 @@ import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningDVWriter;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.spark.SparkWriteUtil;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -80,6 +85,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;
+ private InMemoryMetricsReporter metricsReporter;
/**
* Constructs a {@link SparkPositionDeletesRewrite}.
@@ -114,6 +120,11 @@ public class SparkPositionDeletesRewrite implements Write {
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties();
+
+ if (this.table instanceof BaseTable) {
+ this.metricsReporter = new InMemoryMetricsReporter();
+ ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+ }
}
@Override
@@ -121,6 +132,16 @@ public class SparkPositionDeletesRewrite implements Write {
return new PositionDeleteBatchWrite();
}
+ @Override
+ public CustomTaskMetric[] reportDriverMetrics() {
+ return SparkWriteUtil.customTaskMetrics(metricsReporter);
+ }
+
+ @Override
+ public CustomMetric[] supportedCustomMetrics() {
+ return SparkWriteUtil.supportedCustomMetrics();
+ }
+
/** {@link BatchWrite} class for rewriting position deletes files from Spark
*/
class PositionDeleteBatchWrite implements BatchWrite {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index d072397dc6..a1cb31bd37 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -66,12 +67,14 @@ import org.apache.iceberg.io.PartitioningDVWriter;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.PositionDeltaWriter;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.SparkWriteUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DeleteFileSet;
@@ -83,6 +86,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.write.DeltaBatchWrite;
import org.apache.spark.sql.connector.write.DeltaWrite;
import org.apache.spark.sql.connector.write.DeltaWriter;
@@ -116,6 +121,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final Map<String, String> writeProperties;
private boolean cleanupOnAbort = false;
+ private InMemoryMetricsReporter metricsReporter;
SparkPositionDeltaWrite(
SparkSession spark,
@@ -139,6 +145,11 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties();
+
+ if (this.table instanceof BaseTable) {
+ this.metricsReporter = new InMemoryMetricsReporter();
+ ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+ }
}
@Override
@@ -172,6 +183,16 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return new PositionDeltaBatchWrite();
}
+ @Override
+ public CustomMetric[] supportedCustomMetrics() {
+ return SparkWriteUtil.supportedCustomMetrics();
+ }
+
+ @Override
+ public CustomTaskMetric[] reportDriverMetrics() {
+ return SparkWriteUtil.customTaskMetrics(metricsReporter);
+ }
+
private class PositionDeltaBatchWrite implements DeltaBatchWrite {
@Override
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index c9a94090ef..5f81689f41 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
@@ -53,12 +54,14 @@ import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.SparkWriteUtil;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
@@ -72,6 +75,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -108,6 +113,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final Map<String, String> writeProperties;
private boolean cleanupOnAbort = false;
+ private InMemoryMetricsReporter metricsReporter;
SparkWrite(
SparkSession spark,
@@ -135,6 +141,11 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties();
+
+ if (this.table instanceof BaseTable) {
+ this.metricsReporter = new InMemoryMetricsReporter();
+ ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+ }
}
@Override
@@ -163,6 +174,11 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
return size;
}
+ @Override
+ public CustomMetric[] supportedCustomMetrics() {
+ return SparkWriteUtil.supportedCustomMetrics();
+ }
+
BatchWrite asBatchAppend() {
return new BatchAppend();
}
@@ -265,6 +281,11 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
return files;
}
+ @Override
+ public CustomTaskMetric[] reportDriverMetrics() {
+ return SparkWriteUtil.customTaskMetrics(metricsReporter);
+ }
+
@Override
public String toString() {
return String.format("IcebergWrite(table=%s, format=%s)", table, format);
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
similarity index 87%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
index 7fd1742531..70df1ca2ca 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class AddedDataFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "addedDataFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "number of added data files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
similarity index 86%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
index 7fd1742531..381e912407 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class AddedDeleteFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "addedDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "number of added delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
index 754145f7d2..bd653803fa 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedEqualityDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "addedEqualityDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of added equality delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
index 754145f7d2..c5ee8c952d 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedEqualityDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "addedEqualityDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of added equality deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
similarity index 86%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
index 754145f7d2..8e5a16dbf2 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedFileSizeInBytes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "addedFileSizeInBytes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "total size of added files (bytes)";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
similarity index 84%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
index 754145f7d2..d9d5501a80 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedPositionalDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "addedPositionalDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of added positional delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
index 754145f7d2..bd8995eac6 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedPositionalDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "addedPositionalDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of added positional deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
similarity index 87%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
index de8f04be77..240ec5b34b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class TotalDataManifests extends CustomSumMetric {
+public class AddedRecords extends CustomSumMetric {
- static final String NAME = "totalDataManifest";
+ public static final String NAME = "addedRecords";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class TotalDataManifests extends CustomSumMetric {
@Override
public String description() {
- return "total data manifests";
+ return "number of added records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
index 754145f7d2..4c258c01c9 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class EqualityDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "equalityDeleteFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
index 7fc5b9066c..93dd410b2a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class IndexedDeleteFiles extends CustomSumMetric {
- static final String NAME = "indexedDeleteFiles";
+ public static final String NAME = "indexedDeleteFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
index 5de75776ea..f362d33535 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class PositionalDeleteFiles extends CustomSumMetric {
- static final String NAME = "positionalDeleteFiles";
+ public static final String NAME = "positionalDeleteFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
similarity index 86%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
index 7fd1742531..96e21e96c2 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class RemovedDataFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "removedDataFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "number of removed data files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
similarity index 86%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
index 754145f7d2..9e1267592f 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of removed delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
similarity index 84%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
index 754145f7d2..07c0de3325 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedEqualityDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedEqualityDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of removed equality delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
index 754145f7d2..2ad7f5a344 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedEqualityDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedEqualityDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of removed equality deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
index 754145f7d2..b7ec8b5195 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedFileSizeInBytes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedFileSizeInBytes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "total size of removed files (bytes)";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
similarity index 84%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
index 754145f7d2..d01529753b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedPositionalDeleteFiles extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedPositionalDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of removed positional delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
similarity index 84%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
index 754145f7d2..b99892b211 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedPositionalDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "removedPositionalDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "number of removed positional deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
similarity index 87%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
index 7fd1742531..b186aff8e0 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class RemovedRecords extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "removedRecords";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "number of removed records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
index 21959cbf6c..af75b746e4 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class ResultDataFiles extends CustomSumMetric {
- static final String NAME = "resultDataFiles";
+ public static final String NAME = "resultDataFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
index 9c6ad2ca32..54d7afac81 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class ResultDeleteFiles extends CustomSumMetric {
- static final String NAME = "resultDeleteFiles";
+ public static final String NAME = "resultDeleteFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
index a167904280..f52efec55b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class ScannedDataManifests extends CustomSumMetric {
- static final String NAME = "scannedDataManifests";
+ public static final String NAME = "scannedDataManifests";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
index 1fa006b7b1..8f11eac2f2 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class ScannedDeleteManifests extends CustomSumMetric {
- static final String NAME = "scannedDeleteManifests";
+ public static final String NAME = "scannedDeleteManifests";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
index 7fd1742531..0e57eb31ea 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class SkippedDataFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "skippedDataFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
index b0eaeb5d87..a02644643b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class SkippedDataManifests extends CustomSumMetric {
- static final String NAME = "skippedDataManifests";
+ public static final String NAME = "skippedDataManifests";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
index 70597be671..517415a519 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class SkippedDeleteFiles extends CustomSumMetric {
- static final String NAME = "skippedDeleteFiles";
+ public static final String NAME = "skippedDeleteFiles";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
index 0336170b45..c76aa28834 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class SkippedDeleteManifests extends CustomSumMetric {
- static final String NAME = "skippedDeleteManifests";
+ public static final String NAME = "skippedDeleteManifests";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
index b1ff8a4636..2f93dcabb0 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class TotalDataFileSize extends CustomSumMetric {
- static final String NAME = "totalDataFileSize";
+ public static final String NAME = "totalDataFileSize";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
similarity index 87%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
index 7fd1742531..a18126db7a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class TotalDataFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "totalDataFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "total number of data files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
index de8f04be77..33a0656dcf 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class TotalDataManifests extends CustomSumMetric {
- static final String NAME = "totalDataManifest";
+ public static final String NAME = "totalDataManifest";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
index da43033252..d374a23df1 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class TotalDeleteFileSize extends CustomSumMetric {
- static final String NAME = "totalDeleteFileSize";
+ public static final String NAME = "totalDeleteFileSize";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
similarity index 86%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
index 7fd1742531..66e9416253 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class SkippedDataFiles extends CustomSumMetric {
+public class TotalDeleteFiles extends CustomSumMetric {
- static final String NAME = "skippedDataFiles";
+ public static final String NAME = "totalDeleteFiles";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of skipped data files";
+ return "total number of delete files";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
index 7442dfdb6f..58ac739ea3 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class TotalDeleteManifests extends CustomSumMetric {
- static final String NAME = "totalDeleteManifests";
+ public static final String NAME = "totalDeleteManifests";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
index 754145f7d2..fff664cc9b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class TotalEqualityDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "totalEqualityDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "total number of equality deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
similarity index 89%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
index b1ff8a4636..ec97bfec55 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class TotalDataFileSize extends CustomSumMetric {
+public class TotalFileSizeInBytes extends CustomSumMetric {
- static final String NAME = "totalDataFileSize";
+ public static final String NAME = "totalFileSizeInBytes";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
index 8b66eeac40..f1051bd928 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class TotalPlanningDuration extends CustomSumMetric {
- static final String NAME = "totalPlanningDuration";
+ public static final String NAME = "totalPlanningDuration";
@Override
public String name() {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
similarity index 85%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
index 754145f7d2..da339688a3 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class TotalPositionalDeletes extends CustomSumMetric {
- static final String NAME = "equalityDeleteFiles";
+ public static final String NAME = "totalPositionalDeletes";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
@Override
public String description() {
- return "number of equality delete files";
+ return "total number of positional deletes records";
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
similarity index 87%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
index de8f04be77..3cc1123343 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
import org.apache.spark.sql.connector.metric.CustomSumMetric;
-public class TotalDataManifests extends CustomSumMetric {
+public class TotalRecords extends CustomSumMetric {
- static final String NAME = "totalDataManifest";
+ public static final String NAME = "totalRecords";
@Override
public String name() {
@@ -31,6 +31,6 @@ public class TotalDataManifests extends CustomSumMetric {
@Override
public String description() {
- return "total data manifests";
+ return "total number of records";
}
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
new file mode 100644
index 0000000000..661aac1372
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.iceberg.spark.source.metrics.AddedDataFiles;
+import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedRecords;
+import org.apache.iceberg.spark.source.metrics.RemovedDataFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedRecords;
+import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
+import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalRecords;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.jdk.javaapi.CollectionConverters;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkWriteMetrics extends TestBaseWithCatalog {
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void writeMetrics() {
+ sql("CREATE TABLE %s (id BIGINT) USING iceberg", tableName);
+
+ String insertSql = String.format("INSERT INTO %s SELECT id FROM
range(1000)", tableName);
+ Dataset<Row> result = spark.sql(insertSql);
+ result.collect();
+
+ SparkPlan plan = result.queryExecution().executedPlan();
+ Map<String, SQLMetric> metricsMap =
CollectionConverters.asJava(plan.metrics());
+
+ // If we are at the root, check if we have the metrics.
+ // Sometimes the plan structure is complex (e.g. AdaptiveSparkPlanExec).
+ // We might want to find the specific write node.
+
+ if (!metricsMap.containsKey(AddedDataFiles.NAME)) {
+ // Attempt to find a node with these metrics
+ metricsMap = findMetrics(plan, AddedDataFiles.NAME);
+ }
+
+ assertThat(metricsMap).isNotNull();
+ assertThat(metricsMap)
+ .hasEntrySatisfying(AddedDataFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(2));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedRecords.NAME, metric ->
assertThat(metric.value()).isEqualTo(1000));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedFileSizeInBytes.NAME, metric ->
assertThat(metric.value()).isGreaterThan(0));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(TotalDataFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(2));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ TotalRecords.NAME, metric ->
assertThat(metric.value()).isEqualTo(1000));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ TotalFileSizeInBytes.NAME, metric ->
assertThat(metric.value()).isGreaterThan(0));
+
+ // Verify other metrics are 0
+ String[] zeroMetrics = {
+ AddedDeleteFiles.NAME,
+ AddedEqualityDeleteFiles.NAME,
+ AddedPositionalDeleteFiles.NAME,
+ AddedEqualityDeletes.NAME,
+ AddedPositionalDeletes.NAME,
+ RemovedDataFiles.NAME,
+ RemovedDeleteFiles.NAME,
+ RemovedEqualityDeleteFiles.NAME,
+ RemovedPositionalDeleteFiles.NAME,
+ RemovedEqualityDeletes.NAME,
+ RemovedPositionalDeletes.NAME,
+ RemovedRecords.NAME,
+ RemovedFileSizeInBytes.NAME,
+ TotalDeleteFiles.NAME,
+ TotalEqualityDeletes.NAME,
+ TotalPositionalDeletes.NAME
+ };
+
+ for (String metric : zeroMetrics) {
+ assertThat(metricsMap)
+ .hasEntrySatisfying(metric, m ->
assertThat(m.value()).as(metric).isEqualTo(0));
+ }
+ }
+
+ @TestTemplate
+ public void deleteMetrics() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES
('write.delete.mode'='merge-on-read')",
+ tableName);
+
+ spark.range(100).coalesce(1).writeTo(tableName).append();
+
+ String deleteSql = String.format("DELETE FROM %s WHERE id = 1", tableName);
+ Dataset<Row> result = spark.sql(deleteSql);
+ result.collect();
+
+ SparkPlan plan = result.queryExecution().executedPlan();
+
+ Map<String, SQLMetric> metricsMap = findMetrics(plan,
AddedPositionalDeleteFiles.NAME);
+
+ assertThat(metricsMap).isNotNull();
+
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedPositionalDeleteFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ RemovedDataFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(0));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedPositionalDeletes.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ TotalDeleteFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ TotalPositionalDeletes.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedDeleteFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ AddedFileSizeInBytes.NAME, metric ->
assertThat(metric.value()).isGreaterThan(0));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(TotalDataFiles.NAME, metric ->
assertThat(metric.value()).isEqualTo(1));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(TotalRecords.NAME, metric ->
assertThat(metric.value()).isEqualTo(100));
+ assertThat(metricsMap)
+ .hasEntrySatisfying(
+ TotalFileSizeInBytes.NAME, metric ->
assertThat(metric.value()).isGreaterThan(0));
+
+ // Verify other metrics are 0
+ String[] zeroMetrics = {
+ AddedDataFiles.NAME,
+ AddedEqualityDeleteFiles.NAME,
+ AddedEqualityDeletes.NAME,
+ AddedRecords.NAME,
+ RemovedDeleteFiles.NAME,
+ RemovedEqualityDeleteFiles.NAME,
+ RemovedPositionalDeleteFiles.NAME,
+ RemovedEqualityDeletes.NAME,
+ RemovedPositionalDeletes.NAME,
+ RemovedRecords.NAME,
+ RemovedFileSizeInBytes.NAME,
+ TotalEqualityDeletes.NAME
+ };
+
+ for (String metric : zeroMetrics) {
+ assertThat(metricsMap)
+ .hasEntrySatisfying(metric, m ->
assertThat(m.value()).as(metric).isEqualTo(0));
+ }
+ }
+
+ private Map<String, SQLMetric> findMetrics(SparkPlan plan, String
metricName) {
+ Map<String, SQLMetric> metrics =
CollectionConverters.asJava(plan.metrics());
+ if (metrics.containsKey(metricName)) {
+ return metrics;
+ }
+
+ for (SparkPlan child : CollectionConverters.asJava(plan.children())) {
+ Map<String, SQLMetric> result = findMetrics(child, metricName);
+ if (result != null) {
+ return result;
+ }
+ }
+
+ for (Object child : CollectionConverters.asJava(plan.innerChildren())) {
+ if (child instanceof SparkPlan) {
+ Map<String, SQLMetric> result = findMetrics((SparkPlan) child,
metricName);
+ if (result != null) {
+ return result;
+ }
+ }
+ }
+
+ return null;
+ }
+}