This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 305ab1544c API, Core: Add scan metrics reporter and logging (#5268)
305ab1544c is described below
commit 305ab1544c7a7b844386fe9aca7ce9951936c021
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Wed Aug 3 20:29:07 2022 +0200
API, Core: Add scan metrics reporter and logging (#5268)
---
.../org/apache/iceberg/io/CloseableIterable.java | 90 +++++++
.../org/apache/iceberg/io/CloseableIterator.java | 23 ++
.../org/apache/iceberg/metrics/IntCounter.java | 19 ++
.../iceberg/metrics/LoggingScanReporter.java | 36 +++
.../org/apache/iceberg/metrics/LongCounter.java | 19 ++
.../org/apache/iceberg/metrics/MetricsContext.java | 15 +-
.../org/apache/iceberg/metrics/ScanReport.java | 216 +++++++++++++++++
.../org/apache/iceberg/metrics/ScanReporter.java | 37 +++
.../apache/iceberg/io/TestCloseableIterable.java | 129 ++++++++++
.../org/apache/iceberg/metrics/TestScanReport.java | 132 ++++++++++
.../main/java/org/apache/iceberg/BaseTable.java | 15 +-
.../java/org/apache/iceberg/BaseTableScan.java | 31 ++-
.../java/org/apache/iceberg/DataTableScan.java | 10 +-
.../java/org/apache/iceberg/DeleteFileIndex.java | 7 +
.../java/org/apache/iceberg/ManifestGroup.java | 53 ++++-
.../java/org/apache/iceberg/ManifestReader.java | 7 +
.../java/org/apache/iceberg/TableScanContext.java | 65 ++++-
.../apache/iceberg/rest/RESTSessionCatalog.java | 6 +-
.../test/java/org/apache/iceberg/TestTables.java | 27 +++
.../iceberg/TestScanPlanningAndReporting.java | 265 +++++++++++++++++++++
20 files changed, 1164 insertions(+), 38 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 068584689e..7cea2c9d1e 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -26,6 +26,7 @@ import java.util.NoSuchElementException;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -73,6 +74,36 @@ public interface CloseableIterable<T> extends Iterable<T>,
Closeable {
};
}
+ /**
+ * Will run the given runnable when {@link CloseableIterable#close()} has
been called.
+ *
+ * @param iterable The underlying {@link CloseableIterable} to iterate over
+ * @param onCompletionRunnable The runnable to run after the underlying
iterable was closed
+ * @param <E> The type of the underlying iterable
+ * @return A new {@link CloseableIterable} where the runnable will be
executed as the final step
+ * after {@link CloseableIterable#close()} has been called
+ */
+ static <E> CloseableIterable<E> whenComplete(
+ CloseableIterable<E> iterable, Runnable onCompletionRunnable) {
+ Preconditions.checkNotNull(
+ onCompletionRunnable, "Cannot execute a null Runnable after
completion");
+ return new CloseableIterable<E>() {
+ @Override
+ public void close() throws IOException {
+ try {
+ iterable.close();
+ } finally {
+ onCompletionRunnable.run();
+ }
+ }
+
+ @Override
+ public CloseableIterator<E> iterator() {
+ return CloseableIterator.withClose(iterable.iterator());
+ }
+ };
+ }
+
static <E> CloseableIterable<E> filter(CloseableIterable<E> iterable,
Predicate<E> pred) {
return combine(
() ->
@@ -85,6 +116,65 @@ public interface CloseableIterable<T> extends Iterable<T>,
Closeable {
iterable);
}
+ /**
+ * Filters the given {@link CloseableIterable} and counts the number of
elements that do not match
+ * the predicate by incrementing the {@link MetricsContext.Counter}.
+ *
+ * @param skipCounter The {@link MetricsContext.Counter} instance to
increment on each skipped
+ * item during filtering.
+ * @param iterable The underlying {@link CloseableIterable} to filter.
+ * @param <E> The underlying type to be iterated.
+ * @return A filtered {@link CloseableIterable} where the given skipCounter
is incremented
+ * whenever the predicate does not match.
+ */
+ static <E> CloseableIterable<E> filter(
+ MetricsContext.Counter<?> skipCounter, CloseableIterable<E> iterable,
Predicate<E> pred) {
+ Preconditions.checkArgument(null != skipCounter, "Invalid counter: null");
+ Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
+ Preconditions.checkArgument(null != pred, "Invalid predicate: null");
+ return combine(
+ () ->
+ new FilterIterator<E>(iterable.iterator()) {
+ @Override
+ protected boolean shouldKeep(E item) {
+ boolean matches = pred.test(item);
+ if (!matches) {
+ skipCounter.increment();
+ }
+ return matches;
+ }
+ },
+ iterable);
+ }
+
+ /**
+ * Counts the number of elements in the given {@link CloseableIterable} by
incrementing the {@link
+ * MetricsContext.Counter} instance for each {@link Iterator#next()} call.
+ *
+ * @param counter The {@link MetricsContext.Counter} instance to increment
on each {@link
+ * Iterator#next()} call.
+ * @param iterable The underlying {@link CloseableIterable} to count
+ * @param <T> The underlying type to be iterated.
+ * @return A {@link CloseableIterable} that increments the given counter on
each {@link
+ * Iterator#next()} call.
+ */
+ static <T> CloseableIterable<T> count(
+ MetricsContext.Counter<?> counter, CloseableIterable<T> iterable) {
+ Preconditions.checkArgument(null != counter, "Invalid counter: null");
+ Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
+ return new CloseableIterable<T>() {
+ @Override
+ public CloseableIterator<T> iterator() {
+ return CloseableIterator.count(counter, iterable.iterator());
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterable.close();
+ }
+ };
+ }
+
static <I, O> CloseableIterable<O> transform(
CloseableIterable<I> iterable, Function<I, O> transform) {
Preconditions.checkNotNull(transform, "Cannot apply a null transform");
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
b/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
index 3a79b84353..1661acdcda 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
+import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public interface CloseableIterator<T> extends Iterator<T>, Closeable {
@@ -77,4 +78,26 @@ public interface CloseableIterator<T> extends Iterator<T>,
Closeable {
}
};
}
+
+ static <T> CloseableIterator<T> count(
+ MetricsContext.Counter<?> counter, CloseableIterator<T> iterator) {
+ return new CloseableIterator<T>() {
+ @Override
+ public void close() throws IOException {
+ iterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ T next = iterator.next();
+ counter.increment();
+ return next;
+ }
+ };
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
b/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
index 33b79920d3..4c9724e2a9 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/IntCounter.java
@@ -27,6 +27,25 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
* {@link Integer} to count events.
*/
class IntCounter implements MetricsContext.Counter<Integer> {
+ static final IntCounter NOOP =
+ new IntCounter("NOOP", MetricsContext.Unit.UNDEFINED) {
+ @Override
+ public void increment() {}
+
+ @Override
+ public void increment(Integer amount) {}
+
+ @Override
+ public Optional<Integer> count() {
+ return Optional.of(value());
+ }
+
+ @Override
+ public Integer value() {
+ return 0;
+ }
+ };
+
private final AtomicInteger counter;
private final String name;
private final MetricsContext.Unit unit;
diff --git
a/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java
b/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java
new file mode 100644
index 0000000000..2c9d9f1ad6
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A default {@link ScanReporter} implementation that logs the {@link
ScanReport} to the log file.
+ */
+public class LoggingScanReporter implements ScanReporter {
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggingScanReporter.class);
+
+ @Override
+ public void reportScan(ScanReport scanReport) {
+ Preconditions.checkArgument(null != scanReport, "Invalid scan report:
null");
+ LOG.info("Completed scan planning: {}", scanReport);
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
b/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
index 46d8772de0..071ca825e4 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/LongCounter.java
@@ -27,6 +27,25 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
* {@link Long} to count events.
*/
class LongCounter implements MetricsContext.Counter<Long> {
+ static final LongCounter NOOP =
+ new LongCounter("NOOP", MetricsContext.Unit.UNDEFINED) {
+ @Override
+ public void increment() {}
+
+ @Override
+ public void increment(Long amount) {}
+
+ @Override
+ public Optional<Long> count() {
+ return Optional.of(value());
+ }
+
+ @Override
+ public Long value() {
+ return 0L;
+ }
+ };
+
private final AtomicLong counter;
private final String name;
private final MetricsContext.Unit unit;
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
b/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
index f30cf78740..68aed0f763 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java
@@ -135,13 +135,16 @@ public interface MetricsContext extends Serializable {
@Override
public <T extends Number> Counter<T> counter(String name, Class<T> type,
Unit unit) {
- return new Counter<T>() {
- @Override
- public void increment() {}
+ if (Integer.class.equals(type)) {
+ return (Counter<T>) IntCounter.NOOP;
+ }
- @Override
- public void increment(T amount) {}
- };
+ if (Long.class.equals(type)) {
+ return (Counter<T>) LongCounter.NOOP;
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Counter for type %s is not supported",
type.getName()));
}
};
}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
b/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
new file mode 100644
index 0000000000..2eded304e8
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.MetricsContext.Counter;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** A Table Scan report that contains all relevant information from a Table
Scan. */
+public class ScanReport implements Serializable {
+
+ private final String tableName;
+ private final long snapshotId;
+ private final Expression filter;
+ private final Schema projection;
+ private final ScanMetrics scanMetrics;
+
+ private ScanReport(
+ String tableName,
+ long snapshotId,
+ Expression filter,
+ Schema projection,
+ ScanMetrics scanMetrics) {
+ this.tableName = tableName;
+ this.snapshotId = snapshotId;
+ this.filter = filter;
+ this.projection = projection;
+ this.scanMetrics = scanMetrics;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public Expression filter() {
+ return filter;
+ }
+
+ public Schema projection() {
+ return projection;
+ }
+
+ public ScanMetrics scanMetrics() {
+ return scanMetrics;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("tableName", tableName)
+ .add("snapshotId", snapshotId)
+ .add("filter", filter)
+ .add("projection", projection)
+ .add("scanMetrics", scanMetrics)
+ .toString();
+ }
+
+ public static class Builder {
+ private String tableName;
+ private long snapshotId = -1L;
+ private Expression filter;
+ private Schema projection;
+ private ScanMetrics scanMetrics;
+
+ private Builder() {}
+
+ public Builder withTableName(String newTableName) {
+ this.tableName = newTableName;
+ return this;
+ }
+
+ public Builder withSnapshotId(long newSnapshotId) {
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ public Builder withFilter(Expression newFilter) {
+ this.filter = newFilter;
+ return this;
+ }
+
+ public Builder withProjection(Schema newProjection) {
+ this.projection = newProjection;
+ return this;
+ }
+
+ public Builder fromScanMetrics(ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
+ public ScanReport build() {
+ Preconditions.checkArgument(null != tableName, "Invalid table name:
null");
+ Preconditions.checkArgument(null != filter, "Invalid expression filter:
null");
+ Preconditions.checkArgument(null != projection, "Invalid schema
projection: null");
+ Preconditions.checkArgument(null != scanMetrics, "Invalid scan metrics:
null");
+ return new ScanReport(tableName, snapshotId, filter, projection,
scanMetrics);
+ }
+ }
+
+ /** Carries all metrics for a particular scan */
+ public static class ScanMetrics {
+ public static final ScanMetrics NOOP = new
ScanMetrics(MetricsContext.nullMetrics());
+ private final Timer totalPlanningDuration;
+ private final Counter<Integer> resultDataFiles;
+ private final Counter<Integer> resultDeleteFiles;
+ private final Counter<Integer> totalDataManifests;
+ private final Counter<Integer> totalDeleteManifests;
+ private final Counter<Integer> scannedDataManifests;
+ private final Counter<Integer> skippedDataManifests;
+ private final Counter<Long> totalFileSizeInBytes;
+ private final Counter<Long> totalDeleteFileSizeInBytes;
+
+ public ScanMetrics(MetricsContext metricsContext) {
+ Preconditions.checkArgument(null != metricsContext, "Invalid metrics
context: null");
+ this.totalPlanningDuration =
+ metricsContext.timer("totalPlanningDuration", TimeUnit.NANOSECONDS);
+ this.resultDataFiles =
+ metricsContext.counter("resultDataFiles", Integer.class,
MetricsContext.Unit.COUNT);
+ this.resultDeleteFiles =
+ metricsContext.counter("resultDeleteFiles", Integer.class,
MetricsContext.Unit.COUNT);
+ this.scannedDataManifests =
+ metricsContext.counter("scannedDataManifests", Integer.class,
MetricsContext.Unit.COUNT);
+ this.totalDataManifests =
+ metricsContext.counter("totalDataManifests", Integer.class,
MetricsContext.Unit.COUNT);
+ this.totalDeleteManifests =
+ metricsContext.counter("totalDeleteManifests", Integer.class,
MetricsContext.Unit.COUNT);
+ this.totalFileSizeInBytes =
+ metricsContext.counter("totalFileSizeInBytes", Long.class,
MetricsContext.Unit.BYTES);
+ this.totalDeleteFileSizeInBytes =
+ metricsContext.counter(
+ "totalDeleteFileSizeInBytes", Long.class,
MetricsContext.Unit.BYTES);
+ this.skippedDataManifests =
+ metricsContext.counter("skippedDataManifests", Integer.class,
MetricsContext.Unit.COUNT);
+ }
+
+ public Timer totalPlanningDuration() {
+ return totalPlanningDuration;
+ }
+
+ public Counter<Integer> resultDataFiles() {
+ return resultDataFiles;
+ }
+
+ public Counter<Integer> resultDeleteFiles() {
+ return resultDeleteFiles;
+ }
+
+ public Counter<Integer> scannedDataManifests() {
+ return scannedDataManifests;
+ }
+
+ public Counter<Integer> totalDataManifests() {
+ return totalDataManifests;
+ }
+
+ public Counter<Integer> totalDeleteManifests() {
+ return totalDeleteManifests;
+ }
+
+ public Counter<Long> totalFileSizeInBytes() {
+ return totalFileSizeInBytes;
+ }
+
+ public Counter<Long> totalDeleteFileSizeInBytes() {
+ return totalDeleteFileSizeInBytes;
+ }
+
+ public Counter<Integer> skippedDataManifests() {
+ return skippedDataManifests;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .addValue(totalPlanningDuration)
+ .addValue(resultDataFiles)
+ .addValue(resultDeleteFiles)
+ .addValue(scannedDataManifests)
+ .addValue(skippedDataManifests)
+ .addValue(totalDataManifests)
+ .addValue(totalDeleteManifests)
+ .addValue(totalFileSizeInBytes)
+ .addValue(totalDeleteFileSizeInBytes)
+ .toString();
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java
b/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java
new file mode 100644
index 0000000000..958c03ef8f
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/metrics/ScanReporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.metrics.ScanReport.ScanMetrics;
+
+/**
+ * This interface defines the basic API for a Table Scan Reporter that can be
used to report
+ * different metrics after a Table scan is done.
+ */
+@FunctionalInterface
+public interface ScanReporter {
+
+ /**
+ * Indicates that a Scan is done by reporting a {@link ScanReport}. A {@link
ScanReport} is
+ * usually directly derived from a {@link ScanMetrics} instance.
+ *
+ * @param scanReport The {@link ScanReport} to report.
+ */
+ void reportScan(ScanReport scanReport);
+}
diff --git a/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
b/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
index 826dcb4fbf..68895cfc43 100644
--- a/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
+++ b/api/src/test/java/org/apache/iceberg/io/TestCloseableIterable.java
@@ -19,12 +19,16 @@
package org.apache.iceberg.io;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import org.apache.iceberg.AssertHelpers;
import
org.apache.iceberg.io.TestableCloseableIterable.TestableCloseableIterator;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
@@ -93,6 +97,71 @@ public class TestCloseableIterable {
() -> Iterables.getLast(concat5));
}
+ @Test
+ public void testWithCompletionRunnable() throws IOException {
+ AtomicInteger completionCounter = new AtomicInteger(0);
+ List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+ Assertions.assertThatThrownBy(
+ () ->
CloseableIterable.whenComplete(CloseableIterable.combine(items, () -> {}),
null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Cannot execute a null Runnable after completion");
+
+ try (CloseableIterable<Integer> iter =
+ CloseableIterable.whenComplete(
+ CloseableIterable.combine(items, () -> {}),
completionCounter::incrementAndGet)) {
+ iter.forEach(val ->
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+ }
+ Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWithCompletionRunnableAndEmptyIterable() throws IOException {
+ AtomicInteger completionCounter = new AtomicInteger(0);
+ CloseableIterable<Integer> empty = CloseableIterable.empty();
+ try (CloseableIterable<Integer> iter =
+ CloseableIterable.whenComplete(
+ CloseableIterable.combine(empty, () -> {}),
completionCounter::incrementAndGet)) {
+ iter.forEach(val ->
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+ }
+ Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWithCompletionRunnableAndUnclosedIterable() {
+ AtomicInteger completionCounter = new AtomicInteger(0);
+ List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+ CloseableIterable<Integer> iter =
+ CloseableIterable.whenComplete(
+ CloseableIterable.combine(items, () -> {}),
completionCounter::incrementAndGet);
+ iter.forEach(val ->
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+ // given that we never close iter, the completionRunnable is never called
+ Assertions.assertThat(completionCounter.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void testWithCompletionRunnableWhenIterableThrows() {
+ AtomicInteger completionCounter = new AtomicInteger(0);
+ List<Integer> items = Lists.newArrayList(1, 2, 3, 4, 5);
+
+ Assertions.assertThatThrownBy(
+ () -> {
+ try (CloseableIterable<Integer> iter =
+ CloseableIterable.whenComplete(
+ CloseableIterable.combine(
+ items,
+ () -> {
+ throw new RuntimeException("expected");
+ }),
+ completionCounter::incrementAndGet)) {
+ iter.forEach(val ->
Assertions.assertThat(completionCounter.get()).isEqualTo(0));
+ }
+ })
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("expected");
+
+ Assertions.assertThat(completionCounter.get()).isEqualTo(1);
+ }
+
@Test
public void testConcatWithEmpty() {
AtomicInteger counter = new AtomicInteger(0);
@@ -131,4 +200,64 @@ public class TestCloseableIterable {
}
Assertions.assertThat(counter.get()).isEqualTo(items.size());
}
+
+ @Test
+ public void count() {
+ MetricsContext.Counter<Integer> counter =
+ new DefaultMetricsContext().counter("x", Integer.class,
MetricsContext.Unit.COUNT);
+ CloseableIterable<Integer> items =
+ CloseableIterable.count(
+ counter, CloseableIterable.withNoopClose(Arrays.asList(1, 2, 3, 4,
5)));
+ Assertions.assertThat(counter.value()).isEqualTo(0);
+ items.forEach(item -> {});
+ Assertions.assertThat(counter.value()).isEqualTo(5);
+ }
+
+ @Test
+ public void countSkipped() {
+ MetricsContext.Counter<Integer> counter =
+ new DefaultMetricsContext().counter("x", Integer.class,
MetricsContext.Unit.COUNT);
+ CloseableIterable<Integer> items =
+ CloseableIterable.filter(
+ counter,
+ CloseableIterable.withNoopClose(Arrays.asList(1, 2, 3, 4, 5)),
+ x -> x % 2 == 0);
+ Assertions.assertThat(counter.value()).isEqualTo(0);
+ items.forEach(item -> {});
+ Assertions.assertThat(counter.value()).isEqualTo(3);
+ }
+
+ @Test
+ public void countNullCheck() {
+ Assertions.assertThatThrownBy(() -> CloseableIterable.count(null,
CloseableIterable.empty()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid counter: null");
+
+ MetricsContext.Counter<Integer> counter =
+ new DefaultMetricsContext().counter("x", Integer.class,
MetricsContext.Unit.COUNT);
+ Assertions.assertThatThrownBy(() -> CloseableIterable.count(counter, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid iterable: null");
+ }
+
+ @Test
+ public void countSkippedNullCheck() {
+ Assertions.assertThatThrownBy(
+ () ->
+ CloseableIterable.filter(null, CloseableIterable.empty(),
Predicate.isEqual(true)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid counter: null");
+
+ MetricsContext.Counter<Integer> counter =
+ new DefaultMetricsContext().counter("x", Integer.class,
MetricsContext.Unit.COUNT);
+ Assertions.assertThatThrownBy(
+ () -> CloseableIterable.filter(counter, null,
Predicate.isEqual(true)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid iterable: null");
+
+ Assertions.assertThatThrownBy(
+ () -> CloseableIterable.filter(counter, CloseableIterable.empty(),
null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid predicate: null");
+ }
}
diff --git a/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java
b/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java
new file mode 100644
index 0000000000..ed3bce44de
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/metrics/TestScanReport.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.True;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestScanReport {
+
+ @Test
+ public void missingFields() {
+ Assertions.assertThatThrownBy(() -> ScanReport.builder().build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid table name: null");
+
+ Assertions.assertThatThrownBy(() ->
ScanReport.builder().withTableName("x").build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid expression filter: null");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ ScanReport.builder()
+ .withTableName("x")
+ .withFilter(Expressions.alwaysTrue())
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid schema projection: null");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ ScanReport.builder()
+ .withTableName("x")
+ .withFilter(Expressions.alwaysTrue())
+ .withProjection(
+ new Schema(
+ Types.NestedField.required(1, "c1",
Types.StringType.get(), "c1")))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid scan metrics: null");
+ }
+
+ @Test
+ public void fromEmptyScanMetrics() {
+ String tableName = "x";
+ True filter = Expressions.alwaysTrue();
+ Schema projection =
+ new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(),
"c1"));
+ ScanReport scanReport =
+ ScanReport.builder()
+ .withTableName(tableName)
+ .withFilter(filter)
+ .withProjection(projection)
+ .fromScanMetrics(ScanReport.ScanMetrics.NOOP)
+ .build();
+
+ Assertions.assertThat(scanReport.tableName()).isEqualTo(tableName);
+ Assertions.assertThat(scanReport.projection()).isEqualTo(projection);
+ Assertions.assertThat(scanReport.filter()).isEqualTo(filter);
+ Assertions.assertThat(scanReport.snapshotId()).isEqualTo(-1);
+
Assertions.assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isEqualTo(Duration.ZERO);
+
Assertions.assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(0);
+
Assertions.assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(0);
+
Assertions.assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(0);
+
Assertions.assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(0L);
+ }
+
+ @Test
+ public void fromScanMetrics() {
+ ScanReport.ScanMetrics scanMetrics = new ScanReport.ScanMetrics(new
DefaultMetricsContext());
+ scanMetrics.totalPlanningDuration().record(10, TimeUnit.MINUTES);
+ scanMetrics.resultDataFiles().increment(5);
+ scanMetrics.resultDeleteFiles().increment(5);
+ scanMetrics.scannedDataManifests().increment(5);
+ scanMetrics.totalFileSizeInBytes().increment(1024L);
+ scanMetrics.totalDataManifests().increment(5);
+
+ String tableName = "x";
+ True filter = Expressions.alwaysTrue();
+ Schema projection =
+ new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(),
"c1"));
+ ScanReport scanReport =
+ ScanReport.builder()
+ .withTableName(tableName)
+ .withFilter(filter)
+ .withProjection(projection)
+ .withSnapshotId(23L)
+ .fromScanMetrics(scanMetrics)
+ .build();
+
+ Assertions.assertThat(scanReport.tableName()).isEqualTo(tableName);
+ Assertions.assertThat(scanReport.projection()).isEqualTo(projection);
+ Assertions.assertThat(scanReport.filter()).isEqualTo(filter);
+ Assertions.assertThat(scanReport.snapshotId()).isEqualTo(23L);
+
Assertions.assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isEqualTo(Duration.ofMinutes(10L));
+
Assertions.assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(5);
+
Assertions.assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(5);
+
Assertions.assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(5);
+
Assertions.assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(5);
+
Assertions.assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(1024L);
+ }
+
+ @Test
+ public void nullScanMetrics() {
+ Assertions.assertThatThrownBy(() -> new ScanReport.ScanMetrics(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid metrics context: null");
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 00842f6d77..9605b07d8d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
/**
* Base {@link Table} implementation.
@@ -37,10 +39,18 @@ import org.apache.iceberg.io.LocationProvider;
public class BaseTable implements Table, HasTableOperations, Serializable {
private final TableOperations ops;
private final String name;
+ private final ScanReporter scanReporter;
public BaseTable(TableOperations ops, String name) {
this.ops = ops;
this.name = name;
+ this.scanReporter = new LoggingScanReporter();
+ }
+
+ public BaseTable(TableOperations ops, String name, ScanReporter
scanReporter) {
+ this.ops = ops;
+ this.name = name;
+ this.scanReporter = scanReporter;
}
@Override
@@ -60,12 +70,13 @@ public class BaseTable implements Table,
HasTableOperations, Serializable {
@Override
public TableScan newScan() {
- return new DataTableScan(ops, this);
+ return new DataTableScan(ops, this, schema(), new
TableScanContext().reportWith(scanReporter));
}
@Override
public IncrementalAppendScan newIncrementalAppendScan() {
- return new BaseIncrementalAppendScan(ops, this);
+ return new BaseIncrementalAppendScan(
+ ops, this, schema(), new TableScanContext().reportWith(scanReporter));
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 98a7df0325..5f48786f5d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -25,6 +25,9 @@ import org.apache.iceberg.events.ScanEvent;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.DateTimeUtil;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask,
CombinedScanTask>
implements TableScan {
private static final Logger LOG =
LoggerFactory.getLogger(BaseTableScan.class);
+ private ScanReport.ScanMetrics scanMetrics;
protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
this(ops, table, schema, new TableScanContext());
@@ -69,6 +73,14 @@ abstract class BaseTableScan extends BaseScan<TableScan,
FileScanTask, CombinedS
protected abstract CloseableIterable<FileScanTask> doPlanFiles();
+ protected ScanReport.ScanMetrics scanMetrics() {
+ if (scanMetrics == null) {
+ this.scanMetrics = new ScanReport.ScanMetrics(new
DefaultMetricsContext());
+ }
+
+ return scanMetrics;
+ }
+
@Override
public Table table() {
return super.table();
@@ -121,9 +133,22 @@ abstract class BaseTableScan extends BaseScan<TableScan,
FileScanTask, CombinedS
ExpressionUtil.toSanitizedString(filter()));
Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(),
filter(), schema()));
-
- return doPlanFiles();
-
+ Timer.Timed scanDuration = scanMetrics().totalPlanningDuration().start();
+
+ return CloseableIterable.whenComplete(
+ doPlanFiles(),
+ () -> {
+ scanDuration.stop();
+ ScanReport scanReport =
+ ScanReport.builder()
+ .withFilter(ExpressionUtil.sanitize(filter()))
+ .withProjection(schema())
+ .withTableName(table().name())
+ .withSnapshotId(snapshot.snapshotId())
+ .fromScanMetrics(scanMetrics())
+ .build();
+ context().scanReporter().reportScan(scanReport);
+ });
} else {
LOG.info("Scanning empty table {}", table());
return CloseableIterable.empty();
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 46ae8ae0ed..678dd8884a 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg;
+import java.util.List;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -104,19 +105,24 @@ public class DataTableScan extends BaseTableScan {
Snapshot snapshot = snapshot();
FileIO io = table().io();
+ List<ManifestFile> dataManifests = snapshot.dataManifests(io);
+ List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
+ scanMetrics().totalDataManifests().increment(dataManifests.size());
+ scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
ManifestGroup manifestGroup =
- new ManifestGroup(io, snapshot.dataManifests(io),
snapshot.deleteManifests(io))
+ new ManifestGroup(io, dataManifests, deleteManifests)
.caseSensitive(isCaseSensitive())
.select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
.specsById(table().specs())
+ .scanMetrics(scanMetrics())
.ignoreDeleted();
if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}
- if (snapshot.dataManifests(io).size() > 1
+ if (dataManifests.size() > 1
&& (PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor())) {
manifestGroup = manifestGroup.planWith(planExecutor());
}
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 877c83cf8a..58cadf474b 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -363,6 +364,7 @@ class DeleteFileIndex {
private PartitionSet partitionSet = null;
private boolean caseSensitive = true;
private ExecutorService executorService = null;
+ private ScanReport.ScanMetrics scanMetrics = ScanReport.ScanMetrics.NOOP;
Builder(FileIO io, Set<ManifestFile> deleteManifests) {
this.io = io;
@@ -404,6 +406,11 @@ class DeleteFileIndex {
return this;
}
+ Builder scanMetrics(ScanReport.ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate
the matching files in
// a queue
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index dd814b0aec..beceefe89a 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -60,6 +61,7 @@ class ManifestGroup {
private List<String> columns;
private boolean caseSensitive;
private ExecutorService executorService;
+ private ScanReport.ScanMetrics scanMetrics;
ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(
@@ -83,6 +85,7 @@ class ManifestGroup {
this.caseSensitive = true;
this.manifestPredicate = m -> true;
this.manifestEntryPredicate = e -> true;
+ this.scanMetrics = ScanReport.ScanMetrics.NOOP;
}
ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
@@ -119,6 +122,11 @@ class ManifestGroup {
return this;
}
+ ManifestGroup scanMetrics(ScanReport.ScanMetrics metrics) {
+ this.scanMetrics = metrics;
+ return this;
+ }
+
ManifestGroup ignoreDeleted() {
this.ignoreDeleted = true;
return this;
@@ -171,7 +179,7 @@ class ManifestGroup {
return ResidualEvaluator.of(spec, filter, caseSensitive);
});
- DeleteFileIndex deleteFiles = deleteIndexBuilder.build();
+ DeleteFileIndex deleteFiles =
deleteIndexBuilder.scanMetrics(scanMetrics).build();
boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
if (!deleteFiles.isEmpty()) {
@@ -184,7 +192,7 @@ class ManifestGroup {
specId -> {
PartitionSpec spec = specsById.get(specId);
ResidualEvaluator residuals = residualCache.get(specId);
- return new TaskContext(spec, deleteFiles, residuals,
dropStats);
+ return new TaskContext(spec, deleteFiles, residuals,
dropStats, scanMetrics);
});
Iterable<CloseableIterable<T>> tasks =
@@ -239,11 +247,14 @@ class ManifestGroup {
evaluator = null;
}
- Iterable<ManifestFile> matchingManifests =
+ CloseableIterable<ManifestFile> closeableDataManifests =
+ CloseableIterable.withNoopClose(dataManifests);
+ CloseableIterable<ManifestFile> matchingManifests =
evalCache == null
- ? dataManifests
- : Iterables.filter(
- dataManifests,
+ ? closeableDataManifests
+ : CloseableIterable.filter(
+ scanMetrics.skippedDataManifests(),
+ closeableDataManifests,
manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
if (ignoreDeleted) {
@@ -251,7 +262,8 @@ class ManifestGroup {
// remove any manifests that don't have any existing or added files. if
either the added or
// existing files count is missing, the manifest must be scanned.
matchingManifests =
- Iterables.filter(
+ CloseableIterable.filter(
+ scanMetrics.skippedDataManifests(),
matchingManifests,
manifest -> manifest.hasAddedFiles() ||
manifest.hasExistingFiles());
}
@@ -261,12 +273,17 @@ class ManifestGroup {
// remove any manifests that don't have any deleted or added files. if
either the added or
// deleted files count is missing, the manifest must be scanned.
matchingManifests =
- Iterables.filter(
+ CloseableIterable.filter(
+ scanMetrics.skippedDataManifests(),
matchingManifests,
manifest -> manifest.hasAddedFiles() ||
manifest.hasDeletedFiles());
}
- matchingManifests = Iterables.filter(matchingManifests,
manifestPredicate::test);
+ matchingManifests =
+ CloseableIterable.filter(
+ scanMetrics.skippedDataManifests(), matchingManifests,
manifestPredicate);
+ matchingManifests =
+ CloseableIterable.count(scanMetrics.scannedDataManifests(),
matchingManifests);
return Iterables.transform(
matchingManifests,
@@ -281,7 +298,8 @@ class ManifestGroup {
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
- .select(columns);
+ .select(columns)
+ .scanMetrics(scanMetrics);
CloseableIterable<ManifestEntry<DataFile>> entries;
if (ignoreDeleted) {
@@ -325,6 +343,12 @@ class ManifestGroup {
entry -> {
DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
+ for (DeleteFile deleteFile : deleteFiles) {
+
ctx.scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
+ }
+
ctx.scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
+ ctx.scanMetrics().resultDataFiles().increment();
+ ctx.scanMetrics().resultDeleteFiles().increment(deleteFiles.length);
return new BaseFileScanTask(
dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(),
ctx.residuals());
});
@@ -342,17 +366,20 @@ class ManifestGroup {
private final DeleteFileIndex deletes;
private final ResidualEvaluator residuals;
private final boolean dropStats;
+ private final ScanReport.ScanMetrics scanMetrics;
TaskContext(
PartitionSpec spec,
DeleteFileIndex deletes,
ResidualEvaluator residuals,
- boolean dropStats) {
+ boolean dropStats,
+ ScanReport.ScanMetrics scanMetrics) {
this.schemaAsString = SchemaParser.toJson(spec.schema());
this.specAsString = PartitionSpecParser.toJson(spec);
this.deletes = deletes;
this.residuals = residuals;
this.dropStats = dropStats;
+ this.scanMetrics = scanMetrics;
}
String schemaAsString() {
@@ -374,5 +401,9 @@ class ManifestGroup {
boolean shouldKeepStats() {
return !dropStats;
}
+
+ public ScanReport.ScanMetrics scanMetrics() {
+ return scanMetrics;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 19875e8084..7fc5678fbd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -92,6 +93,7 @@ public class ManifestReader<F extends ContentFile<F>> extends
CloseableGroup
private Schema fileProjection = null;
private Collection<String> columns = null;
private boolean caseSensitive = true;
+ private ScanReport.ScanMetrics scanMetrics = ScanReport.ScanMetrics.NOOP;
// lazily initialized
private Evaluator lazyEvaluator = null;
@@ -186,6 +188,11 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
return this;
}
+ ManifestReader<F> scanMetrics(ScanReport.ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
CloseableIterable<ManifestEntry<F>> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue())
|| (partFilter != null && partFilter != Expressions.alwaysTrue())
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index beca0db87c..5ce966d034 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -24,6 +24,8 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ThreadPools;
@@ -42,6 +44,7 @@ final class TableScanContext {
private final Long toSnapshotId;
private final ExecutorService planExecutor;
private final boolean fromSnapshotInclusive;
+ private final ScanReporter scanReporter;
TableScanContext() {
this.snapshotId = null;
@@ -56,6 +59,7 @@ final class TableScanContext {
this.toSnapshotId = null;
this.planExecutor = null;
this.fromSnapshotInclusive = false;
+ this.scanReporter = new LoggingScanReporter();
}
private TableScanContext(
@@ -70,7 +74,8 @@ final class TableScanContext {
Long fromSnapshotId,
Long toSnapshotId,
ExecutorService planExecutor,
- boolean fromSnapshotInclusive) {
+ boolean fromSnapshotInclusive,
+ ScanReporter scanReporter) {
this.snapshotId = snapshotId;
this.rowFilter = rowFilter;
this.ignoreResiduals = ignoreResiduals;
@@ -83,6 +88,7 @@ final class TableScanContext {
this.toSnapshotId = toSnapshotId;
this.planExecutor = planExecutor;
this.fromSnapshotInclusive = fromSnapshotInclusive;
+ this.scanReporter = scanReporter;
}
Long snapshotId() {
@@ -102,7 +108,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
Expression rowFilter() {
@@ -122,7 +129,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
boolean ignoreResiduals() {
@@ -142,7 +150,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
boolean caseSensitive() {
@@ -162,7 +171,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
boolean returnColumnStats() {
@@ -182,7 +192,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
Collection<String> selectedColumns() {
@@ -204,7 +215,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
Schema projectedSchema() {
@@ -226,7 +238,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
Map<String, String> options() {
@@ -249,7 +262,8 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
Long fromSnapshotId() {
@@ -269,7 +283,8 @@ final class TableScanContext {
id,
toSnapshotId,
planExecutor,
- false);
+ false,
+ scanReporter);
}
TableScanContext fromSnapshotIdInclusive(long id) {
@@ -285,7 +300,8 @@ final class TableScanContext {
id,
toSnapshotId,
planExecutor,
- true);
+ true,
+ scanReporter);
}
boolean fromSnapshotInclusive() {
@@ -309,7 +325,8 @@ final class TableScanContext {
fromSnapshotId,
id,
planExecutor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
}
ExecutorService planExecutor() {
@@ -333,6 +350,28 @@ final class TableScanContext {
fromSnapshotId,
toSnapshotId,
executor,
- fromSnapshotInclusive);
+ fromSnapshotInclusive,
+ scanReporter);
+ }
+
+ ScanReporter scanReporter() {
+ return scanReporter;
+ }
+
+ TableScanContext reportWith(ScanReporter reporter) {
+ return new TableScanContext(
+ snapshotId,
+ rowFilter,
+ ignoreResiduals,
+ caseSensitive,
+ colStats,
+ projectedSchema,
+ selectedColumns,
+ options,
+ fromSnapshotId,
+ toSnapshotId,
+ planExecutor,
+ fromSnapshotInclusive,
+ reporter);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index c7892a7e7f..9da1efef69 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -56,6 +56,8 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -102,6 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private ResourcePaths paths = null;
private Object conf = null;
private FileIO io = null;
+ private ScanReporter scanReporter = null;
// a lazy thread pool for token refresh
private volatile ScheduledExecutorService refreshExecutor = null;
@@ -171,6 +174,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
this.io =
CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(),
mergedProps, conf);
+ this.scanReporter = new LoggingScanReporter();
super.initialize(name, mergedProps);
}
@@ -280,7 +284,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
tableFileIO(response.config()),
response.tableMetadata());
- BaseTable table = new BaseTable(ops, fullTableName(loadedIdent));
+ BaseTable table = new BaseTable(ops, fullTableName(loadedIdent),
this.scanReporter);
if (metadataType != null) {
return MetadataTableUtils.createMetadataTableInstance(table,
metadataType);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 15ad6b900e..21b2c3fa58 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.ScanReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -71,6 +72,27 @@ public class TestTables {
return new TestTable(ops, name);
}
+ public static TestTable create(
+ File temp,
+ String name,
+ Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder,
+ int formatVersion,
+ ScanReporter scanReporter) {
+ TestTableOperations ops = new TestTableOperations(name, temp);
+ if (ops.current() != null) {
+ throw new AlreadyExistsException("Table %s already exists at location:
%s", name, temp);
+ }
+
+ ops.commit(
+ null,
+ newTableMetadata(
+ schema, spec, sortOrder, temp.toString(), ImmutableMap.of(),
formatVersion));
+
+ return new TestTable(ops, name, scanReporter);
+ }
+
public static Transaction beginCreate(File temp, String name, Schema schema,
PartitionSpec spec) {
return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
}
@@ -158,6 +180,11 @@ public class TestTables {
this.ops = ops;
}
+ private TestTable(TestTableOperations ops, String name, ScanReporter
scanReporter) {
+ super(ops, name, scanReporter);
+ this.ops = ops;
+ }
+
TestTableOperations ops() {
return ops;
}
diff --git
a/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
b/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
new file mode 100644
index 0000000000..84e651bcbf
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.metrics.LoggingScanReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReporter;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
+
+public class TestScanPlanningAndReporting extends TableTestBase {
+
+ private final TestScanReporter reporter = new TestScanReporter();
+
+ public TestScanPlanningAndReporting() {
+ super(2);
+ }
+
+ @Test
+ public void testScanPlanningWithReport() throws IOException {
+ String tableName = "simple-scan-planning";
+ Table table = createTableWithCustomRecords(tableName);
+ TableScan tableScan = table.newScan();
+
+ // should be 3 files
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ ScanReport scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+
+ assertThat(scanReport.tableName()).isEqualTo(tableName);
+ assertThat(scanReport.snapshotId()).isEqualTo(1L);
+ assertThat(scanReport.filter()).isEqualTo(Expressions.alwaysTrue());
+
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isGreaterThan(Duration.ZERO);
+
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
+
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(1850L);
+
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+
+ // should be 1 file
+ try (CloseableIterable<FileScanTask> fileScanTasks =
+ tableScan.filter(Expressions.lessThan("x", "30")).planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+ assertThat(scanReport.tableName()).isEqualTo(tableName);
+ assertThat(scanReport.snapshotId()).isEqualTo(1L);
+
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isGreaterThan(Duration.ZERO);
+
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(616L);
+
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+ }
+
+ private Table createTableWithCustomRecords(String tableName) throws
IOException {
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()), required(2, "x",
Types.StringType.get()));
+
+ Table table =
+ TestTables.create(
+ tableDir,
+ tableName,
+ schema,
+ PartitionSpec.builderFor(schema).build(),
+ SortOrder.unsorted(),
+ formatVersion,
+ reporter);
+ GenericRecord record = GenericRecord.create(schema);
+ record.setField("id", 1);
+ record.setField("x", "23");
+
+ GenericRecord record2 = record.copy(ImmutableMap.of("id", 2, "x", "30"));
+ GenericRecord record3 = record.copy(ImmutableMap.of("id", 3, "x", "45"));
+ GenericRecord record4 = record.copy(ImmutableMap.of("id", 4, "x", "51"));
+
+ DataFile dataFile = writeParquetFile(table, Arrays.asList(record,
record3));
+ DataFile dataFile2 = writeParquetFile(table, Arrays.asList(record2));
+ DataFile dataFile3 = writeParquetFile(table, Arrays.asList(record4));
+
table.newFastAppend().appendFile(dataFile).appendFile(dataFile2).appendFile(dataFile3).commit();
+ return table;
+ }
+
+ @Test
+ public void deleteScanning() throws IOException {
+ Table table =
+ TestTables.create(
+ tableDir,
+ "scan-planning-with-deletes",
+ SCHEMA,
+ SPEC,
+ SortOrder.unsorted(),
+ formatVersion,
+ reporter);
+
+
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit();
+ TableScan tableScan = table.newScan();
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ ScanReport scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+ assertThat(scanReport.tableName()).isEqualTo("scan-planning-with-deletes");
+ assertThat(scanReport.snapshotId()).isEqualTo(2L);
+
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isGreaterThan(Duration.ZERO);
+
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
+
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(2);
+
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(30L);
+
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(20L);
+ }
+
+ @Test
+ public void multipleDataManifests() throws IOException {
+ Table table =
+ TestTables.create(
+ tableDir,
+ "multiple-data-manifests",
+ SCHEMA,
+ SPEC,
+ SortOrder.unsorted(),
+ formatVersion,
+ reporter);
+
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ TableScan tableScan = table.newScan();
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ ScanReport scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+ assertThat(scanReport.tableName()).isEqualTo("multiple-data-manifests");
+ assertThat(scanReport.snapshotId()).isEqualTo(2L);
+
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isGreaterThan(Duration.ZERO);
+
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(4);
+
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(2);
+
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
+
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(40L);
+
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+
+ // we should hit only a single data manifest and only a single data file
+ try (CloseableIterable<FileScanTask> fileScanTasks =
+ tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+ assertThat(scanReport.tableName()).isEqualTo("multiple-data-manifests");
+ assertThat(scanReport.snapshotId()).isEqualTo(2L);
+
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
+ .isGreaterThan(Duration.ZERO);
+
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(1);
+
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
+
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
+
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(10L);
+
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
+ }
+
+ private DataFile writeParquetFile(Table table, List<GenericRecord> records)
throws IOException {
+ File parquetFile = temp.newFile();
+ assertTrue(parquetFile.delete());
+ FileAppender<GenericRecord> appender =
+ Parquet.write(Files.localOutput(parquetFile))
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .build();
+ try {
+ appender.addAll(records);
+ } finally {
+ appender.close();
+ }
+
+ return DataFiles.builder(table.spec())
+ .withInputFile(localInput(parquetFile))
+ .withMetrics(appender.metrics())
+ .withFormat(FileFormat.PARQUET)
+ .build();
+ }
+
+ private static class TestScanReporter implements ScanReporter {
+ private final List<ScanReport> reports = Lists.newArrayList();
+ // this is mainly so that we see scan reports being logged during tests
+ private final LoggingScanReporter delegate = new LoggingScanReporter();
+
+ @Override
+ public void reportScan(ScanReport scanReport) {
+ reports.add(scanReport);
+ delegate.reportScan(scanReport);
+ }
+
+ public ScanReport lastReport() {
+ if (reports.isEmpty()) {
+ return null;
+ }
+ return reports.get(reports.size() - 1);
+ }
+ }
+}