This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 305ab1544c API, Core: Add scan metrics reporter and logging (#5268)
305ab1544c is described below

commit 305ab1544c7a7b844386fe9aca7ce9951936c021
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Wed Aug 3 20:29:07 2022 +0200

    API, Core: Add scan metrics reporter and logging (#5268)
---
 .../org/apache/iceberg/io/CloseableIterable.java   |  90 +++++++
 .../org/apache/iceberg/io/CloseableIterator.java   |  23 ++
 .../org/apache/iceberg/metrics/IntCounter.java     |  19 ++
 .../iceberg/metrics/LoggingScanReporter.java       |  36 +++
 .../org/apache/iceberg/metrics/LongCounter.java    |  19 ++
 .../org/apache/iceberg/metrics/MetricsContext.java |  15 +-
 .../org/apache/iceberg/metrics/ScanReport.java     | 216 +++++++++++++++++
 .../org/apache/iceberg/metrics/ScanReporter.java   |  37 +++
 .../apache/iceberg/io/TestCloseableIterable.java   | 129 ++++++++++
 .../org/apache/iceberg/metrics/TestScanReport.java | 132 ++++++++++
 .../main/java/org/apache/iceberg/BaseTable.java    |  15 +-
 .../java/org/apache/iceberg/BaseTableScan.java     |  31 ++-
 .../java/org/apache/iceberg/DataTableScan.java     |  10 +-
 .../java/org/apache/iceberg/DeleteFileIndex.java   |   7 +
 .../java/org/apache/iceberg/ManifestGroup.java     |  53 ++++-
 .../java/org/apache/iceberg/ManifestReader.java    |   7 +
 .../java/org/apache/iceberg/TableScanContext.java  |  65 ++++-
 .../apache/iceberg/rest/RESTSessionCatalog.java    |   6 +-
 .../test/java/org/apache/iceberg/TestTables.java   |  27 +++
 .../iceberg/TestScanPlanningAndReporting.java      | 265 +++++++++++++++++++++
 20 files changed, 1164 insertions(+), 38 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java 
b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 068584689e..7cea2c9d1e 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -26,6 +26,7 @@ import java.util.NoSuchElementException;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
@@ -73,6 +74,36 @@ public interface CloseableIterable<T> extends Iterable<T>, 
Closeable {
     };
   }
 
+  /**
+   * Will run the given runnable when {@link CloseableIterable#close()} has 
been called.
+   *
+   * @param iterable The underlying {@link CloseableIterable} to iterate over
+   * @param onCompletionRunnable The runnable to run after the underlying 
iterable was closed
+   * @param <E> The type of the underlying iterable
+   * @return A new {@link CloseableIterable} where the runnable will be 
executed as the final step
+   *     after {@link CloseableIterable#close()} has been called
+   */
+  static <E> CloseableIterable<E> whenComplete(
+      CloseableIterable<E> iterable, Runnable onCompletionRunnable) {
+    Preconditions.checkNotNull(
+        onCompletionRunnable, "Cannot execute a null Runnable after 
completion");
+    return new CloseableIterable<E>() {
+      @Override
+      public void close() throws IOException {
+        try {
+          iterable.close();
+        } finally {
+          onCompletionRunnable.run();
+        }
+      }
+
+      @Override
+      public CloseableIterator<E> iterator() {
+        return CloseableIterator.withClose(iterable.iterator());
+      }
+    };
+  }
+
   static <E> CloseableIterable<E> filter(CloseableIterable<E> iterable, 
Predicate<E> pred) {
     return combine(
         () ->
@@ -85,6 +116,65 @@ public interface CloseableIterable<T> extends Iterable<T>, 
Closeable {
         iterable);
   }
 
+  /**
+   * Filters the given {@link CloseableIterable} and counts the number of 
elements that do not match
+   * the predicate by incrementing the {@link MetricsContext.Counter}.
+   *
+   * @param skipCounter The {@link MetricsContext.Counter} instance to 
increment on each skipped
+   *     item during filtering.
+   * @param iterable The underlying {@link CloseableIterable} to filter.
+   * @param <E> The underlying type to be iterated.
+   * @return A filtered {@link CloseableIterable} where the given skipCounter 
is incremented
+   *     whenever the predicate does not match.
+   */
+  static <E> CloseableIterable<E> filter(
+      MetricsContext.Counter<?> skipCounter, CloseableIterable<E> iterable, 
Predicate<E> pred) {
+    Preconditions.checkArgument(null != skipCounter, "Invalid counter: null");
+    Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
+    Preconditions.checkArgument(null != pred, "Invalid predicate: null");
+    return combine(
+        () ->
+            new FilterIterator<E>(iterable.iterator()) {
+              @Override
+              protected boolean shouldKeep(E item) {
+                boolean matches = pred.test(item);
+                if (!matches) {
+                  skipCounter.increment();
+                }
+                return matches;
+              }
+            },
+        iterable);
+  }
+
+  /**
+   * Counts the number of elements in the given {@link CloseableIterable} by 
incrementing the {@link
+   * MetricsContext.Counter} instance for each {@link Iterator#next()} call.
+   *
+   * @param counter The {@link MetricsContext.Counter} instance to increment 
on each {@link
+   *     Iterator#next()} call.
+   * @param iterable The underlying {@link CloseableIterable} to count
+   * @param <T> The underlying type to be iterated.
+   * @return A {@link CloseableIterable} that increments the given counter on 
each {@link
+   *     Iterator#next()} call.
+   */
+  static <T> CloseableIterable<T> count(
+      MetricsContext.Counter<?> counter, CloseableIterable<T> iterable) {
+    Preconditions.checkArgument(null != counter, "Invalid counter: null");
+    Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
+    return new CloseableIterable<T>() {
+      @Override
+      public CloseableIterator<T> iterator() {
+        return CloseableIterator.count(counter, iterable.iterator());
+      }
+
+      @Override
+      public void close() throws IOException {
+        iterable.close();
+      }
+    };
+  }
+
   static <I, O> CloseableIterable<O> transform(
       CloseableIterable<I> iterable, Function<I, O> transform) {
     Preconditions.checkNotNull(transform, "Cannot apply a null transform");
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java 
b/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
index 3a79b84353..1661acdcda 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.function.Function;
+import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 public interface CloseableIterator<T> extends Iterator<T>, Closeable {
@@ -77,4 +78,26 @@ public interface CloseableIterator<T> extends Iterator<T>, 
Closeable {
       }
     };
   }
+
+  static <T> CloseableIterator<T> count(
+      MetricsContext.Counter<?> counter, CloseableIterator<T> iterator) {
+    return new CloseableIterator<T>() {
+      @Override
+      public void close() throws IOException {
+        iterator.close();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public T next() {
+        T next = iterator.next();
+        counter.increment();
+        return next;
+      }
+    };
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java 
b/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
index 33b79920d3..4c9724e2a9 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
@@ -27,6 +27,25 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
  * {@link Integer} to count events.
  */
 class IntCounter implements MetricsContext.Counter<Integer> {
+  static final IntCounter NOOP =
+      new IntCounter("NOOP", MetricsContext.Unit.UNDEFINED) {
+        @Override
+        public void increment() {}
+
+        @Override
+        public void increment(Integer amount) {}
+
+        @Override
+        public Optional<Integer> count() {
+          return Optional.of(value());
+        }
+
+        @Override
+        public Integer value() {
+          return 0;
+        }
+      };
+
   private final AtomicInteger counter;
   private final String name;
   private final MetricsContext.Unit unit;
diff --git 
a/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java 
b/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java
new file mode 100644
index 0000000000..2c9d9f1ad6
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A default {@link ScanReporter} implementation that logs the {@link 
ScanReport} to the log file.
+ */
+public class LoggingScanReporter implements ScanReporter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LoggingScanReporter.class);
+
+  @Override
+  public void reportScan(ScanReport scanReport) {
+    Preconditions.checkArgument(null != scanReport, "Invalid scan report: 
null");
+    LOG.info("Completed scan planning: {}", scanReport);
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java 
b/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
index 46d8772de0..071ca825e4 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
@@ -27,6 +27,25 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
  * {@link Long} to count events.
  */
 class LongCounter implements MetricsContext.Counter<Long> {
+  static final LongCounter NOOP =
+      new LongCounter("NOOP", MetricsContext.Unit.UNDEFINED) {
+        @Override
+        public void increment() {}
+
+        @Override
+        public void increment(Long amount) {}
+
+        @Override
+        public Optional<Long> count() {
+          return Optional.of(value());
+        }
+
+        @Override
+        public Long value() {
+          return 0L;
+        }
+      };
+
   private final AtomicLong counter;
   private final String name;
   private final MetricsContext.Unit unit;
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java 
b/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
index f30cf78740..68aed0f763 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
@@ -135,13 +135,16 @@ public interface MetricsContext extends Serializable {
 
       @Override
       public <T extends Number> Counter<T> counter(String name, Class<T> type, 
Unit unit) {
-        return new Counter<T>() {
-          @Override
-          public void increment() {}
+        if (Integer.class.equals(type)) {
+          return (Counter<T>) IntCounter.NOOP;
+        }
 
-          @Override
-          public void increment(T amount) {}
-        };
+        if (Long.class.equals(type)) {
+          return (Counter<T>) LongCounter.NOOP;
+        }
+
+        throw new IllegalArgumentException(
+            String.format("Counter for type %s is not supported", 
type.getName()));
       }
     };
   }
diff --git a/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java 
b/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
new file mode 100644
index 0000000000..2eded304e8
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.MetricsContext.Counter;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** A Table Scan report that contains all relevant information from a Table 
Scan. */
+public class ScanReport implements Serializable {
+
+  private final String tableName;
+  private final long snapshotId;
+  private final Expression filter;
+  private final Schema projection;
+  private final ScanMetrics scanMetrics;
+
+  private ScanReport(
+      String tableName,
+      long snapshotId,
+      Expression filter,
+      Schema projection,
+      ScanMetrics scanMetrics) {
+    this.tableName = tableName;
+    this.snapshotId = snapshotId;
+    this.filter = filter;
+    this.projection = projection;
+    this.scanMetrics = scanMetrics;
+  }
+
+  public String tableName() {
+    return tableName;
+  }
+
+  public long snapshotId() {
+    return snapshotId;
+  }
+
+  public Expression filter() {
+    return filter;
+  }
+
+  public Schema projection() {
+    return projection;
+  }
+
+  public ScanMetrics scanMetrics() {
+    return scanMetrics;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("tableName", tableName)
+        .add("snapshotId", snapshotId)
+        .add("filter", filter)
+        .add("projection", projection)
+        .add("scanMetrics", scanMetrics)
+        .toString();
+  }
+
+  public static class Builder {
+    private String tableName;
+    private long snapshotId = -1L;
+    private Expression filter;
+    private Schema projection;
+    private ScanMetrics scanMetrics;
+
+    private Builder() {}
+
+    public Builder withTableName(String newTableName) {
+      this.tableName = newTableName;
+      return this;
+    }
+
+    public Builder withSnapshotId(long newSnapshotId) {
+      this.snapshotId = newSnapshotId;
+      return this;
+    }
+
+    public Builder withFilter(Expression newFilter) {
+      this.filter = newFilter;
+      return this;
+    }
+
+    public Builder withProjection(Schema newProjection) {
+      this.projection = newProjection;
+      return this;
+    }
+
+    public Builder fromScanMetrics(ScanMetrics newScanMetrics) {
+      this.scanMetrics = newScanMetrics;
+      return this;
+    }
+
+    public ScanReport build() {
+      Preconditions.checkArgument(null != tableName, "Invalid table name: 
null");
+      Preconditions.checkArgument(null != filter, "Invalid expression filter: 
null");
+      Preconditions.checkArgument(null != projection, "Invalid schema 
projection: null");
+      Preconditions.checkArgument(null != scanMetrics, "Invalid scan metrics: 
null");
+      return new ScanReport(tableName, snapshotId, filter, projection, 
scanMetrics);
+    }
+  }
+
+  /** Carries all metrics for a particular scan */
+  public static class ScanMetrics {
+    public static final ScanMetrics NOOP = new 
ScanMetrics(MetricsContext.nullMetrics());
+    private final Timer totalPlanningDuration;
+    private final Counter<Integer> resultDataFiles;
+    private final Counter<Integer> resultDeleteFiles;
+    private final Counter<Integer> totalDataManifests;
+    private final Counter<Integer> totalDeleteManifests;
+    private final Counter<Integer> scannedDataManifests;
+    private final Counter<Integer> skippedDataManifests;
+    private final Counter<Long> totalFileSizeInBytes;
+    private final Counter<Long> totalDeleteFileSizeInBytes;
+
+    public ScanMetrics(MetricsContext metricsContext) {
+      Preconditions.checkArgument(null != metricsContext, "Invalid metrics 
context: null");
+      this.totalPlanningDuration =
+          metricsContext.timer("totalPlanningDuration", TimeUnit.NANOSECONDS);
+      this.resultDataFiles =
+          metricsContext.counter("resultDataFiles", Integer.class, 
MetricsContext.Unit.COUNT);
+      this.resultDeleteFiles =
+          metricsContext.counter("resultDeleteFiles", Integer.class, 
MetricsContext.Unit.COUNT);
+      this.scannedDataManifests =
+          metricsContext.counter("scannedDataManifests", Integer.class, 
MetricsContext.Unit.COUNT);
+      this.totalDataManifests =
+          metricsContext.counter("totalDataManifests", Integer.class, 
MetricsContext.Unit.COUNT);
+      this.totalDeleteManifests =
+          metricsContext.counter("totalDeleteManifests", Integer.class, 
MetricsContext.Unit.COUNT);
+      this.totalFileSizeInBytes =
+          metricsContext.counter("totalFileSizeInBytes", Long.class, 
MetricsContext.Unit.BYTES);
+      this.totalDeleteFileSizeInBytes =
+          metricsContext.counter(
+              "totalDeleteFileSizeInBytes", Long.class, 
MetricsContext.Unit.BYTES);
+      this.skippedDataManifests =
+          metricsContext.counter("skippedDataManifests", Integer.class, 
MetricsContext.Unit.COUNT);
+    }
+
+    public Timer totalPlanningDuration() {
+      return totalPlanningDuration;
+    }
+
+    public Counter<Integer> resultDataFiles() {
+      return resultDataFiles;
+    }
+
+    public Counter<Integer> resultDeleteFiles() {
+      return resultDeleteFiles;
+    }
+
+    public Counter<Integer> scannedDataManifests() {
+      return scannedDataManifests;
+    }
+
+    public Counter<Integer> totalDataManifests() {
+      return totalDataManifests;
+    }
+
+    public Counter<Integer> totalDeleteManifests() {
+      return totalDeleteManifests;
+    }
+
+    public Counter<Long> totalFileSizeInBytes() {
+      return totalFileSizeInBytes;
+    }
+
+    public Counter<Long> totalDeleteFileSizeInBytes() {
+      return totalDeleteFileSizeInBytes;
+    }
+
+    public Counter<Integer> skippedDataManifests() {
+      return skippedDataManifests;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .addValue(totalPlanningDuration)
+          .addValue(resultDataFiles)
+          .addValue(resultDeleteFiles)
+          .addValue(scannedDataManifests)
+          .addValue(skippedDataManifests)
+          .addValue(totalDataManifests)
+          .addValue(totalDeleteManifests)
+          .addValue(totalFileSizeInBytes)
+          .addValue(totalDeleteFileSizeInBytes)
+          .toString();
+    }
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java 
b/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java
new file mode 100644
index 0000000000..958c03ef8f
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.metrics.ScanReport.ScanMetrics;
+
+/**
+ * This interface defines the basic API for a Table Scan Reporter that can be 
used to report
+ * different metrics after a Table scan is done.
+ */
+@FunctionalInterface
+public interface ScanReporter {
+
+  /**
+   * Indicates that a Scan is done by reporting a {@link ScanReport}. A {@link 
ScanReport} is
+   * usually directly derived from a {@link ScanMetrics} instance.
+   *
+   * @param scanReport The {@link ScanReport} to report.
+   */
+  void reportScan(ScanReport scanReport);
+}
diff --git a/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java 
b/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
index 826dcb4fbf..68895cfc43 100644
--- a/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
+++ b/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
@@ -19,12 +19,16 @@
 package org.apache.iceberg.io;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import org.apache.iceberg.AssertHelpers;
 import 
org.apache.iceberg.io.TestableCloseableIterable.TestableCloseableIterator;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
@@ -93,6 +97,71 @@ public class TestCloseableIterable {
         () -> Iterables.getLast(concat5));
   }
 
+  @Test
+  public void testWithCompletionRunnable() throws IOException {
+    AtomicInteger completionCounter = new AtomicInteger(0);
+    List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+    Assertions.assertThatThrownBy(
+            () -> 
CloseableIterable.whenComplete(CloseableIterable.combine(items, () -> {}), 
null))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("Cannot execute a null Runnable after completion");
+
+    try (CloseableIterable<Integer> iter =
+        CloseableIterable.whenComplete(
+            CloseableIterable.combine(items, () -> {}), 
completionCounter::incrementAndGet)) {
+      iter.forEach(val -> 
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+    }
+    Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+  }
+
+  @Test
+  public void testWithCompletionRunnableAndEmptyIterable() throws IOException {
+    AtomicInteger completionCounter = new AtomicInteger(0);
+    CloseableIterable<Integer> empty = CloseableIterable.empty();
+    try (CloseableIterable<Integer> iter =
+        CloseableIterable.whenComplete(
+            CloseableIterable.combine(empty, () -> {}), 
completionCounter::incrementAndGet)) {
+      iter.forEach(val -> 
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+    }
+    Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+  }
+
+  @Test
+  public void testWithCompletionRunnableAndUnclosedIterable() {
+    AtomicInteger completionCounter = new AtomicInteger(0);
+    List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+    CloseableIterable<Integer> iter =
+        CloseableIterable.whenComplete(
+            CloseableIterable.combine(items, () -> {}), 
completionCounter::incrementAndGet);
+    iter.forEach(val -> 
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+    // given that we never close iter, the completionRunnable is never called
+    Assertions.assertThat(completionCounter.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void testWithCompletionRunnableWhenIterableThrows() {
+    AtomicInteger completionCounter = new AtomicInteger(0);
+    List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+
+    Assertions.assertThatThrownBy(
+            () -> {
+              try (CloseableIterable<Integer> iter =
+                  CloseableIterable.whenComplete(
+                      CloseableIterable.combine(
+                          items,
+                          () -> {
+                            throw new RuntimeException("expected");
+                          }),
+                      completionCounter::incrementAndGet)) {
+                iter.forEach(val -> 
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+              }
+            })
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("expected");
+
+    Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+  }
+
   @Test
   public void testConcatWithEmpty() {
     AtomicInteger counter = new AtomicInteger(0);
@@ -131,4 +200,64 @@ public class TestCloseableIterable {
     }
     Assertions.assertThat(counter.get()).isEqualTo(items.size());
   }
+
+  @Test
+  public void count() {
+    MetricsContext.Counter<Integer> counter =
+        new DefaultMetricsContext().counter("x", Integer.class, 
MetricsContext.Unit.COUNT);
+    CloseableIterable<Integer> items =
+        CloseableIterable.count(
+            counter, CloseableIterable.withNoopClose(Arrays.asList(1, 2, 3, 4, 
5)));
+    Assertions.assertThat(counter.value()).isEqualTo(0);
+    items.forEach(item -> {});
+    Assertions.assertThat(counter.value()).isEqualTo(5);
+  }
+
+  @Test
+  public void countSkipped() {
+    MetricsContext.Counter<Integer> counter =
+        new DefaultMetricsContext().counter("x", Integer.class, 
MetricsContext.Unit.COUNT);
+    CloseableIterable<Integer> items =
+        CloseableIterable.filter(
+            counter,
+            CloseableIterable.withNoopClose(Arrays.asList(1, 2, 3, 4, 5)),
+            x -> x % 2 == 0);
+    Assertions.assertThat(counter.value()).isEqualTo(0);
+    items.forEach(item -> {});
+    Assertions.assertThat(counter.value()).isEqualTo(3);
+  }
+
+  @Test
+  public void countNullCheck() {
+    Assertions.assertThatThrownBy(() -> CloseableIterable.count(null, 
CloseableIterable.empty()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid counter: null");
+
+    MetricsContext.Counter<Integer> counter =
+        new DefaultMetricsContext().counter("x", Integer.class, 
MetricsContext.Unit.COUNT);
+    Assertions.assertThatThrownBy(() -> CloseableIterable.count(counter, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid iterable: null");
+  }
+
+  @Test
+  public void countSkippedNullCheck() {
+    Assertions.assertThatThrownBy(
+            () ->
+                CloseableIterable.filter(null, CloseableIterable.empty(), 
Predicate.isEqual(true)))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid counter: null");
+
+    MetricsContext.Counter<Integer> counter =
+        new DefaultMetricsContext().counter("x", Integer.class, 
MetricsContext.Unit.COUNT);
+    Assertions.assertThatThrownBy(
+            () -> CloseableIterable.filter(counter, null, 
Predicate.isEqual(true)))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid iterable: null");
+
+    Assertions.assertThatThrownBy(
+            () -> CloseableIterable.filter(counter, CloseableIterable.empty(), 
null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid predicate: null");
+  }
 }
diff --git a/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java 
b/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java
new file mode 100644
index 0000000000..ed3bce44de
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.True;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestScanReport {
+
+  @Test
+  public void missingFields() {
+    Assertions.assertThatThrownBy(() -> ScanReport.builder().build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid table name: null");
+
+    Assertions.assertThatThrownBy(() -> 
ScanReport.builder().withTableName("x").build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expression filter: null");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                ScanReport.builder()
+                    .withTableName("x")
+                    .withFilter(Expressions.alwaysTrue())
+                    .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid schema projection: null");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                ScanReport.builder()
+                    .withTableName("x")
+                    .withFilter(Expressions.alwaysTrue())
+                    .withProjection(
+                        new Schema(
+                            Types.NestedField.required(1, "c1", 
Types.StringType.get(), "c1")))
+                    .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid scan metrics: null");
+  }
+
+  @Test
+  public void fromEmptyScanMetrics() {
+    String tableName = "x";
+    True filter = Expressions.alwaysTrue();
+    Schema projection =
+        new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), 
"c1"));
+    ScanReport scanReport =
+        ScanReport.builder()
+            .withTableName(tableName)
+            .withFilter(filter)
+            .withProjection(projection)
+            .fromScanMetrics(ScanReport.ScanMetrics.NOOP)
+            .build();
+
+    Assertions.assertThat(scanReport.tableName()).isEqualTo(tableName);
+    Assertions.assertThat(scanReport.projection()).isEqualTo(projection);
+    Assertions.assertThat(scanReport.filter()).isEqualTo(filter);
+    Assertions.assertThat(scanReport.snapshotId()).isEqualTo(-1);
+    
Assertions.assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isEqualTo(Duration.ZERO);
+    
Assertions.assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(0);
+    
Assertions.assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(0);
+    
Assertions.assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(0);
+    
Assertions.assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(0L);
+  }
+
+  @Test
+  public void fromScanMetrics() {
+    ScanReport.ScanMetrics scanMetrics = new ScanReport.ScanMetrics(new 
DefaultMetricsContext());
+    scanMetrics.totalPlanningDuration().record(10, TimeUnit.MINUTES);
+    scanMetrics.resultDataFiles().increment(5);
+    scanMetrics.resultDeleteFiles().increment(5);
+    scanMetrics.scannedDataManifests().increment(5);
+    scanMetrics.totalFileSizeInBytes().increment(1024L);
+    scanMetrics.totalDataManifests().increment(5);
+
+    String tableName = "x";
+    True filter = Expressions.alwaysTrue();
+    Schema projection =
+        new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), 
"c1"));
+    ScanReport scanReport =
+        ScanReport.builder()
+            .withTableName(tableName)
+            .withFilter(filter)
+            .withProjection(projection)
+            .withSnapshotId(23L)
+            .fromScanMetrics(scanMetrics)
+            .build();
+
+    Assertions.assertThat(scanReport.tableName()).isEqualTo(tableName);
+    Assertions.assertThat(scanReport.projection()).isEqualTo(projection);
+    Assertions.assertThat(scanReport.filter()).isEqualTo(filter);
+    Assertions.assertThat(scanReport.snapshotId()).isEqualTo(23L);
+    
Assertions.assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isEqualTo(Duration.ofMinutes(10L));
+    
Assertions.assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(5);
+    
Assertions.assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(5);
+    
Assertions.assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(5);
+    
Assertions.assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(5);
+    
Assertions.assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(1024L);
+  }
+
+  @Test
+  public void nullScanMetrics() {
+    Assertions.assertThatThrownBy(() -> new ScanReport.ScanMetrics(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid metrics context: null");
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 00842f6d77..9605b07d8d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
 
 /**
  * Base {@link Table} implementation.
@@ -37,10 +39,18 @@ import org.apache.iceberg.io.LocationProvider;
 public class BaseTable implements Table, HasTableOperations, Serializable {
   private final TableOperations ops;
   private final String name;
+  private final ScanReporter scanReporter;
 
   public BaseTable(TableOperations ops, String name) {
     this.ops = ops;
     this.name = name;
+    this.scanReporter = new LoggingScanReporter();
+  }
+
+  public BaseTable(TableOperations ops, String name, ScanReporter 
scanReporter) {
+    this.ops = ops;
+    this.name = name;
+    this.scanReporter = scanReporter;
   }
 
   @Override
@@ -60,12 +70,13 @@ public class BaseTable implements Table, 
HasTableOperations, Serializable {
 
   @Override
   public TableScan newScan() {
-    return new DataTableScan(ops, this);
+    return new DataTableScan(ops, this, schema(), new 
TableScanContext().reportWith(scanReporter));
   }
 
   @Override
   public IncrementalAppendScan newIncrementalAppendScan() {
-    return new BaseIncrementalAppendScan(ops, this);
+    return new BaseIncrementalAppendScan(
+        ops, this, schema(), new TableScanContext().reportWith(scanReporter));
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java 
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 98a7df0325..5f48786f5d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -25,6 +25,9 @@ import org.apache.iceberg.events.ScanEvent;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.DateTimeUtil;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
 abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, 
CombinedScanTask>
     implements TableScan {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseTableScan.class);
+  private ScanReport.ScanMetrics scanMetrics;
 
   protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
     this(ops, table, schema, new TableScanContext());
@@ -69,6 +73,14 @@ abstract class BaseTableScan extends BaseScan<TableScan, 
FileScanTask, CombinedS
 
   protected abstract CloseableIterable<FileScanTask> doPlanFiles();
 
+  protected ScanReport.ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = new ScanReport.ScanMetrics(new 
DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
   @Override
   public Table table() {
     return super.table();
@@ -121,9 +133,22 @@ abstract class BaseTableScan extends BaseScan<TableScan, 
FileScanTask, CombinedS
           ExpressionUtil.toSanitizedString(filter()));
 
       Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), 
filter(), schema()));
-
-      return doPlanFiles();
-
+      Timer.Timed scanDuration = scanMetrics().totalPlanningDuration().start();
+
+      return CloseableIterable.whenComplete(
+          doPlanFiles(),
+          () -> {
+            scanDuration.stop();
+            ScanReport scanReport =
+                ScanReport.builder()
+                    .withFilter(ExpressionUtil.sanitize(filter()))
+                    .withProjection(schema())
+                    .withTableName(table().name())
+                    .withSnapshotId(snapshot.snapshotId())
+                    .fromScanMetrics(scanMetrics())
+                    .build();
+            context().scanReporter().reportScan(scanReport);
+          });
     } else {
       LOG.info("Scanning empty table {}", table());
       return CloseableIterable.empty();
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java 
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 46ae8ae0ed..678dd8884a 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg;
 
+import java.util.List;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -104,19 +105,24 @@ public class DataTableScan extends BaseTableScan {
     Snapshot snapshot = snapshot();
 
     FileIO io = table().io();
+    List<ManifestFile> dataManifests = snapshot.dataManifests(io);
+    List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
+    scanMetrics().totalDataManifests().increment(dataManifests.size());
+    scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
     ManifestGroup manifestGroup =
-        new ManifestGroup(io, snapshot.dataManifests(io), 
snapshot.deleteManifests(io))
+        new ManifestGroup(io, dataManifests, deleteManifests)
             .caseSensitive(isCaseSensitive())
             .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
             .filterData(filter())
             .specsById(table().specs())
+            .scanMetrics(scanMetrics())
             .ignoreDeleted();
 
     if (shouldIgnoreResiduals()) {
       manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    if (snapshot.dataManifests(io).size() > 1
+    if (dataManifests.size() > 1
         && (PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor())) {
       manifestGroup = manifestGroup.planWith(planExecutor());
     }
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 877c83cf8a..58cadf474b 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -363,6 +364,7 @@ class DeleteFileIndex {
     private PartitionSet partitionSet = null;
     private boolean caseSensitive = true;
     private ExecutorService executorService = null;
+    private ScanReport.ScanMetrics scanMetrics = ScanReport.ScanMetrics.NOOP;
 
     Builder(FileIO io, Set<ManifestFile> deleteManifests) {
       this.io = io;
@@ -404,6 +406,11 @@ class DeleteFileIndex {
       return this;
     }
 
+    Builder scanMetrics(ScanReport.ScanMetrics newScanMetrics) {
+      this.scanMetrics = newScanMetrics;
+      return this;
+    }
+
     DeleteFileIndex build() {
       // read all of the matching delete manifests in parallel and accumulate 
the matching files in
       // a queue
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java 
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index dd814b0aec..beceefe89a 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -60,6 +61,7 @@ class ManifestGroup {
   private List<String> columns;
   private boolean caseSensitive;
   private ExecutorService executorService;
+  private ScanReport.ScanMetrics scanMetrics;
 
   ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
     this(
@@ -83,6 +85,7 @@ class ManifestGroup {
     this.caseSensitive = true;
     this.manifestPredicate = m -> true;
     this.manifestEntryPredicate = e -> true;
+    this.scanMetrics = ScanReport.ScanMetrics.NOOP;
   }
 
   ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
@@ -119,6 +122,11 @@ class ManifestGroup {
     return this;
   }
 
+  ManifestGroup scanMetrics(ScanReport.ScanMetrics metrics) {
+    this.scanMetrics = metrics;
+    return this;
+  }
+
   ManifestGroup ignoreDeleted() {
     this.ignoreDeleted = true;
     return this;
@@ -171,7 +179,7 @@ class ManifestGroup {
                   return ResidualEvaluator.of(spec, filter, caseSensitive);
                 });
 
-    DeleteFileIndex deleteFiles = deleteIndexBuilder.build();
+    DeleteFileIndex deleteFiles = 
deleteIndexBuilder.scanMetrics(scanMetrics).build();
 
     boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
     if (!deleteFiles.isEmpty()) {
@@ -184,7 +192,7 @@ class ManifestGroup {
                 specId -> {
                   PartitionSpec spec = specsById.get(specId);
                   ResidualEvaluator residuals = residualCache.get(specId);
-                  return new TaskContext(spec, deleteFiles, residuals, 
dropStats);
+                  return new TaskContext(spec, deleteFiles, residuals, 
dropStats, scanMetrics);
                 });
 
     Iterable<CloseableIterable<T>> tasks =
@@ -239,11 +247,14 @@ class ManifestGroup {
       evaluator = null;
     }
 
-    Iterable<ManifestFile> matchingManifests =
+    CloseableIterable<ManifestFile> closeableDataManifests =
+        CloseableIterable.withNoopClose(dataManifests);
+    CloseableIterable<ManifestFile> matchingManifests =
         evalCache == null
-            ? dataManifests
-            : Iterables.filter(
-                dataManifests,
+            ? closeableDataManifests
+            : CloseableIterable.filter(
+                scanMetrics.skippedDataManifests(),
+                closeableDataManifests,
                 manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
 
     if (ignoreDeleted) {
@@ -251,7 +262,8 @@ class ManifestGroup {
       // remove any manifests that don't have any existing or added files. if 
either the added or
       // existing files count is missing, the manifest must be scanned.
       matchingManifests =
-          Iterables.filter(
+          CloseableIterable.filter(
+              scanMetrics.skippedDataManifests(),
               matchingManifests,
               manifest -> manifest.hasAddedFiles() || 
manifest.hasExistingFiles());
     }
@@ -261,12 +273,17 @@ class ManifestGroup {
       // remove any manifests that don't have any deleted or added files. if 
either the added or
       // deleted files count is missing, the manifest must be scanned.
       matchingManifests =
-          Iterables.filter(
+          CloseableIterable.filter(
+              scanMetrics.skippedDataManifests(),
               matchingManifests,
               manifest -> manifest.hasAddedFiles() || 
manifest.hasDeletedFiles());
     }
 
-    matchingManifests = Iterables.filter(matchingManifests, 
manifestPredicate::test);
+    matchingManifests =
+        CloseableIterable.filter(
+            scanMetrics.skippedDataManifests(), matchingManifests, 
manifestPredicate);
+    matchingManifests =
+        CloseableIterable.count(scanMetrics.scannedDataManifests(), 
matchingManifests);
 
     return Iterables.transform(
         matchingManifests,
@@ -281,7 +298,8 @@ class ManifestGroup {
                         .filterRows(dataFilter)
                         .filterPartitions(partitionFilter)
                         .caseSensitive(caseSensitive)
-                        .select(columns);
+                        .select(columns)
+                        .scanMetrics(scanMetrics);
 
                 CloseableIterable<ManifestEntry<DataFile>> entries;
                 if (ignoreDeleted) {
@@ -325,6 +343,12 @@ class ManifestGroup {
         entry -> {
           DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
           DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
+          for (DeleteFile deleteFile : deleteFiles) {
+            
ctx.scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
+          }
+          
ctx.scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
+          ctx.scanMetrics().resultDataFiles().increment();
+          ctx.scanMetrics().resultDeleteFiles().increment(deleteFiles.length);
           return new BaseFileScanTask(
               dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), 
ctx.residuals());
         });
@@ -342,17 +366,20 @@ class ManifestGroup {
     private final DeleteFileIndex deletes;
     private final ResidualEvaluator residuals;
     private final boolean dropStats;
+    private final ScanReport.ScanMetrics scanMetrics;
 
     TaskContext(
         PartitionSpec spec,
         DeleteFileIndex deletes,
         ResidualEvaluator residuals,
-        boolean dropStats) {
+        boolean dropStats,
+        ScanReport.ScanMetrics scanMetrics) {
       this.schemaAsString = SchemaParser.toJson(spec.schema());
       this.specAsString = PartitionSpecParser.toJson(spec);
       this.deletes = deletes;
       this.residuals = residuals;
       this.dropStats = dropStats;
+      this.scanMetrics = scanMetrics;
     }
 
     String schemaAsString() {
@@ -374,5 +401,9 @@ class ManifestGroup {
     boolean shouldKeepStats() {
       return !dropStats;
     }
+
+    public ScanReport.ScanMetrics scanMetrics() {
+      return scanMetrics;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 19875e8084..7fc5678fbd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -92,6 +93,7 @@ public class ManifestReader<F extends ContentFile<F>> extends 
CloseableGroup
   private Schema fileProjection = null;
   private Collection<String> columns = null;
   private boolean caseSensitive = true;
+  private ScanReport.ScanMetrics scanMetrics = ScanReport.ScanMetrics.NOOP;
 
   // lazily initialized
   private Evaluator lazyEvaluator = null;
@@ -186,6 +188,11 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
     return this;
   }
 
+  ManifestReader<F> scanMetrics(ScanReport.ScanMetrics newScanMetrics) {
+    this.scanMetrics = newScanMetrics;
+    return this;
+  }
+
   CloseableIterable<ManifestEntry<F>> entries() {
     if ((rowFilter != null && rowFilter != Expressions.alwaysTrue())
         || (partFilter != null && partFilter != Expressions.alwaysTrue())
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index beca0db87c..5ce966d034 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -24,6 +24,8 @@ import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.util.ThreadPools;
@@ -42,6 +44,7 @@ final class TableScanContext {
   private final Long toSnapshotId;
   private final ExecutorService planExecutor;
   private final boolean fromSnapshotInclusive;
+  private final ScanReporter scanReporter;
 
   TableScanContext() {
     this.snapshotId = null;
@@ -56,6 +59,7 @@ final class TableScanContext {
     this.toSnapshotId = null;
     this.planExecutor = null;
     this.fromSnapshotInclusive = false;
+    this.scanReporter = new LoggingScanReporter();
   }
 
   private TableScanContext(
@@ -70,7 +74,8 @@ final class TableScanContext {
       Long fromSnapshotId,
       Long toSnapshotId,
       ExecutorService planExecutor,
-      boolean fromSnapshotInclusive) {
+      boolean fromSnapshotInclusive,
+      ScanReporter scanReporter) {
     this.snapshotId = snapshotId;
     this.rowFilter = rowFilter;
     this.ignoreResiduals = ignoreResiduals;
@@ -83,6 +88,7 @@ final class TableScanContext {
     this.toSnapshotId = toSnapshotId;
     this.planExecutor = planExecutor;
     this.fromSnapshotInclusive = fromSnapshotInclusive;
+    this.scanReporter = scanReporter;
   }
 
   Long snapshotId() {
@@ -102,7 +108,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   Expression rowFilter() {
@@ -122,7 +129,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   boolean ignoreResiduals() {
@@ -142,7 +150,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   boolean caseSensitive() {
@@ -162,7 +171,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   boolean returnColumnStats() {
@@ -182,7 +192,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   Collection<String> selectedColumns() {
@@ -204,7 +215,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   Schema projectedSchema() {
@@ -226,7 +238,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   Map<String, String> options() {
@@ -249,7 +262,8 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   Long fromSnapshotId() {
@@ -269,7 +283,8 @@ final class TableScanContext {
         id,
         toSnapshotId,
         planExecutor,
-        false);
+        false,
+        scanReporter);
   }
 
   TableScanContext fromSnapshotIdInclusive(long id) {
@@ -285,7 +300,8 @@ final class TableScanContext {
         id,
         toSnapshotId,
         planExecutor,
-        true);
+        true,
+        scanReporter);
   }
 
   boolean fromSnapshotInclusive() {
@@ -309,7 +325,8 @@ final class TableScanContext {
         fromSnapshotId,
         id,
         planExecutor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
   }
 
   ExecutorService planExecutor() {
@@ -333,6 +350,28 @@ final class TableScanContext {
         fromSnapshotId,
         toSnapshotId,
         executor,
-        fromSnapshotInclusive);
+        fromSnapshotInclusive,
+        scanReporter);
+  }
+
+  ScanReporter scanReporter() {
+    return scanReporter;
+  }
+
+  TableScanContext reportWith(ScanReporter reporter) {
+    return new TableScanContext(
+        snapshotId,
+        rowFilter,
+        ignoreResiduals,
+        caseSensitive,
+        colStats,
+        projectedSchema,
+        selectedColumns,
+        options,
+        fromSnapshotId,
+        toSnapshotId,
+        planExecutor,
+        fromSnapshotInclusive,
+        reporter);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index c7892a7e7f..9da1efef69 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -56,6 +56,8 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.ResolvingFileIO;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -102,6 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
   private ResourcePaths paths = null;
   private Object conf = null;
   private FileIO io = null;
+  private ScanReporter scanReporter = null;
 
   // a lazy thread pool for token refresh
   private volatile ScheduledExecutorService refreshExecutor = null;
@@ -171,6 +174,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     this.io =
         CatalogUtil.loadFileIO(
             ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), 
mergedProps, conf);
+    this.scanReporter = new LoggingScanReporter();
 
     super.initialize(name, mergedProps);
   }
@@ -280,7 +284,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
             tableFileIO(response.config()),
             response.tableMetadata());
 
-    BaseTable table = new BaseTable(ops, fullTableName(loadedIdent));
+    BaseTable table = new BaseTable(ops, fullTableName(loadedIdent), 
this.scanReporter);
     if (metadataType != null) {
       return MetadataTableUtils.createMetadataTableInstance(table, 
metadataType);
     }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 15ad6b900e..21b2c3fa58 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.ScanReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -71,6 +72,27 @@ public class TestTables {
     return new TestTable(ops, name);
   }
 
+  public static TestTable create(
+      File temp,
+      String name,
+      Schema schema,
+      PartitionSpec spec,
+      SortOrder sortOrder,
+      int formatVersion,
+      ScanReporter scanReporter) {
+    TestTableOperations ops = new TestTableOperations(name, temp);
+    if (ops.current() != null) {
+      throw new AlreadyExistsException("Table %s already exists at location: 
%s", name, temp);
+    }
+
+    ops.commit(
+        null,
+        newTableMetadata(
+            schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), 
formatVersion));
+
+    return new TestTable(ops, name, scanReporter);
+  }
+
   public static Transaction beginCreate(File temp, String name, Schema schema, 
PartitionSpec spec) {
     return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
   }
@@ -158,6 +180,11 @@ public class TestTables {
       this.ops = ops;
     }
 
+    private TestTable(TestTableOperations ops, String name, ScanReporter 
scanReporter) {
+      super(ops, name, scanReporter);
+      this.ops = ops;
+    }
+
     TestTableOperations ops() {
       return ops;
     }
diff --git 
a/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java 
b/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
new file mode 100644
index 0000000000..84e651bcbf
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReporter;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
+
+public class TestScanPlanningAndReporting extends TableTestBase {
+
+  private final TestScanReporter reporter = new TestScanReporter();
+
+  public TestScanPlanningAndReporting() {
+    super(2);
+  }
+
+  @Test
+  public void testScanPlanningWithReport() throws IOException {
+    String tableName = "simple-scan-planning";
+    Table table = createTableWithCustomRecords(tableName);
+    TableScan tableScan = table.newScan();
+
+    // should be 3 files
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    ScanReport scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+
+    assertThat(scanReport.tableName()).isEqualTo(tableName);
+    assertThat(scanReport.snapshotId()).isEqualTo(1L);
+    assertThat(scanReport.filter()).isEqualTo(Expressions.alwaysTrue());
+    
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isGreaterThan(Duration.ZERO);
+    
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
+    
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(1850L);
+    
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+
+    // should be 1 file
+    try (CloseableIterable<FileScanTask> fileScanTasks =
+        tableScan.filter(Expressions.lessThan("x", "30")).planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+    assertThat(scanReport.tableName()).isEqualTo(tableName);
+    assertThat(scanReport.snapshotId()).isEqualTo(1L);
+    
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isGreaterThan(Duration.ZERO);
+    
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(616L);
+    
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+  }
+
+  private Table createTableWithCustomRecords(String tableName) throws 
IOException {
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.IntegerType.get()), required(2, "x", 
Types.StringType.get()));
+
+    Table table =
+        TestTables.create(
+            tableDir,
+            tableName,
+            schema,
+            PartitionSpec.builderFor(schema).build(),
+            SortOrder.unsorted(),
+            formatVersion,
+            reporter);
+    GenericRecord record = GenericRecord.create(schema);
+    record.setField("id", 1);
+    record.setField("x", "23");
+
+    GenericRecord record2 = record.copy(ImmutableMap.of("id", 2, "x", "30"));
+    GenericRecord record3 = record.copy(ImmutableMap.of("id", 3, "x", "45"));
+    GenericRecord record4 = record.copy(ImmutableMap.of("id", 4, "x", "51"));
+
+    DataFile dataFile = writeParquetFile(table, Arrays.asList(record, 
record3));
+    DataFile dataFile2 = writeParquetFile(table, Arrays.asList(record2));
+    DataFile dataFile3 = writeParquetFile(table, Arrays.asList(record4));
+    
table.newFastAppend().appendFile(dataFile).appendFile(dataFile2).appendFile(dataFile3).commit();
+    return table;
+  }
+
+  @Test
+  public void deleteScanning() throws IOException {
+    Table table =
+        TestTables.create(
+            tableDir,
+            "scan-planning-with-deletes",
+            SCHEMA,
+            SPEC,
+            SortOrder.unsorted(),
+            formatVersion,
+            reporter);
+
+    
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit();
+    TableScan tableScan = table.newScan();
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    ScanReport scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+    assertThat(scanReport.tableName()).isEqualTo("scan-planning-with-deletes");
+    assertThat(scanReport.snapshotId()).isEqualTo(2L);
+    
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isGreaterThan(Duration.ZERO);
+    
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
+    
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(2);
+    
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(30L);
+    
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(20L);
+  }
+
+  @Test
+  public void multipleDataManifests() throws IOException {
+    Table table =
+        TestTables.create(
+            tableDir,
+            "multiple-data-manifests",
+            SCHEMA,
+            SPEC,
+            SortOrder.unsorted(),
+            formatVersion,
+            reporter);
+
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    TableScan tableScan = table.newScan();
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    ScanReport scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+    assertThat(scanReport.tableName()).isEqualTo("multiple-data-manifests");
+    assertThat(scanReport.snapshotId()).isEqualTo(2L);
+    
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isGreaterThan(Duration.ZERO);
+    
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(4);
+    
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(2);
+    
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
+    
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(40L);
+    
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+
+    // we should hit only a single data manifest and only a single data file
+    try (CloseableIterable<FileScanTask> fileScanTasks =
+        tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+    assertThat(scanReport.tableName()).isEqualTo("multiple-data-manifests");
+    assertThat(scanReport.snapshotId()).isEqualTo(2L);
+    
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+        .isGreaterThan(Duration.ZERO);
+    
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(1);
+    
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
+    
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+    
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(10L);
+    
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+  }
+
+  private DataFile writeParquetFile(Table table, List<GenericRecord> records) 
throws IOException {
+    File parquetFile = temp.newFile();
+    assertTrue(parquetFile.delete());
+    FileAppender<GenericRecord> appender =
+        Parquet.write(Files.localOutput(parquetFile))
+            .schema(table.schema())
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .build();
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }
+
+    return DataFiles.builder(table.spec())
+        .withInputFile(localInput(parquetFile))
+        .withMetrics(appender.metrics())
+        .withFormat(FileFormat.PARQUET)
+        .build();
+  }
+
+  private static class TestScanReporter implements ScanReporter {
+    private final List<ScanReport> reports = Lists.newArrayList();
+    // this is mainly so that we see scan reports being logged during tests
+    private final LoggingScanReporter delegate = new LoggingScanReporter();
+
+    @Override
+    public void reportScan(ScanReport scanReport) {
+      reports.add(scanReport);
+      delegate.reportScan(scanReport);
+    }
+
+    public ScanReport lastReport() {
+      if (reports.isEmpty()) {
+        return null;
+      }
+      return reports.get(reports.size() - 1);
+    }
+  }
+}

Reply via email to