This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 880613e Support dateCreated expressions in ScanSummary. (#2)
880613e is described below
commit 880613e0682ad5de6dc3857ab254bb3f7060cc9d
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Nov 28 15:30:53 2018 -0800
Support dateCreated expressions in ScanSummary. (#2)
---
.../main/java/com/netflix/iceberg/ScanSummary.java | 139 +++++++++++++++++++--
.../java/com/netflix/iceberg/TestScanSummary.java | 94 ++++++++++++++
2 files changed, 223 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index 7a94486..b19ab38 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -19,17 +19,30 @@
package com.netflix.iceberg;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.expressions.And;
+import com.netflix.iceberg.expressions.Expression;
+import com.netflix.iceberg.expressions.Expression.Operation;
+import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.Literal;
+import com.netflix.iceberg.expressions.NamedReference;
+import com.netflix.iceberg.expressions.UnboundPredicate;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Types;
+import com.netflix.iceberg.util.Pair;
import java.io.IOException;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
@@ -51,15 +64,15 @@ public class ScanSummary {
}
public static class Builder {
+ private static final Set<String> TIMESTAMP_NAMES = Sets.newHashSet(
+ "dateCreated", "lastUpdated");
private final TableScan scan;
private final Table table;
private final TableOperations ops;
private final Map<Long, Long> snapshotTimestamps;
private int limit = Integer.MAX_VALUE;
private boolean throwIfLimited = false;
- private boolean filterByTimestamp = false;
- private long minTimestamp = 0L;
- private long maxTimestamp = Long.MAX_VALUE;
+ private List<UnboundPredicate<Long>> timeFilters = Lists.newArrayList();
public Builder(TableScan scan) {
this.scan = scan;
@@ -72,17 +85,18 @@ public class ScanSummary {
this.snapshotTimestamps = builder.build();
}
- public Builder after(long timestampMillis) {
+ private void addTimestampFilter(UnboundPredicate<Long> filter) {
throwIfLimited(); // ensure all partitions can be returned
- this.filterByTimestamp = true;
- this.minTimestamp = timestampMillis;
+ timeFilters.add(filter);
+ }
+
+ public Builder after(long timestampMillis) {
+ addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms",
timestampMillis));
return this;
}
public Builder before(long timestampMillis) {
- throwIfLimited(); // ensure all partitions can be returned
- this.filterByTimestamp = true;
- this.maxTimestamp = timestampMillis;
+ addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms",
timestampMillis));
return this;
}
@@ -96,6 +110,28 @@ public class ScanSummary {
return this;
}
+ private void removeTimeFilters(List<Expression> expressions, Expression
expression) {
+ if (expression.op() == Operation.AND) {
+ And and = (And) expression;
+ removeTimeFilters(expressions, and.left());
+ removeTimeFilters(expressions, and.right());
+ return;
+
+ } else if (expression instanceof UnboundPredicate) {
+ UnboundPredicate pred = (UnboundPredicate) expression;
+ NamedReference ref = (NamedReference) pred.ref();
+ Literal<?> lit = pred.literal();
+ if (TIMESTAMP_NAMES.contains(ref.name())) {
+ Literal<Long> tsLiteral = lit.to(Types.TimestampType.withoutZone());
+ long millis = toMillis(tsLiteral.value());
+ addTimestampFilter(Expressions.predicate(pred.op(), "timestamp_ms",
millis));
+ return;
+ }
+ }
+
+ expressions.add(expression);
+ }
+
/**
* Summarizes a table scan as a map of partition key to metrics for that
partition.
*
@@ -105,9 +141,22 @@ public class ScanSummary {
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());
+ List<Expression> filters = Lists.newArrayList();
+ removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
+ Expression rowFilter = joinFilters(filters);
+
+ long minTimestamp = Long.MIN_VALUE;
+ long maxTimestamp = Long.MAX_VALUE;
+ boolean filterByTimestamp = !timeFilters.isEmpty();
+ if (filterByTimestamp) {
+ Pair<Long, Long> range = timestampRange(timeFilters);
+ minTimestamp = range.first();
+ maxTimestamp = range.second();
+ }
+
try (CloseableIterable<ManifestEntry> entries =
new ManifestGroup(ops, table.currentSnapshot().manifests())
- .filterData(scan.filter())
+ .filterData(rowFilter)
.ignoreDeleted()
.select(SCAN_SUMMARY_COLUMNS)
.entries()) {
@@ -217,4 +266,74 @@ public class ScanSummary {
return ImmutableMap.copyOf(map);
}
}
+
+ static Expression joinFilters(List<Expression> expressions) {
+ Expression result = Expressions.alwaysTrue();
+ for (Expression expression : expressions) {
+ result = Expressions.and(result, expression);
+ }
+ return result;
+ }
+
+ static long toMillis(long timestamp) {
+ if (timestamp < 10000000000L) {
+ // in seconds
+ return timestamp * 1000;
+ } else if (timestamp < 10000000000000L) {
+ // in millis
+ return timestamp;
+ }
+ // in micros
+ return timestamp / 1000;
+ }
+
+ static Pair<Long, Long> timestampRange(List<UnboundPredicate<Long>>
timeFilters) {
+ // evaluation is inclusive
+ long minTimestamp = Long.MIN_VALUE;
+ long maxTimestamp = Long.MAX_VALUE;
+
+ for (UnboundPredicate<Long> pred : timeFilters) {
+ long value = pred.literal().value();
+ switch (pred.op()) {
+ case LT:
+ if (value - 1 < maxTimestamp) {
+ maxTimestamp = value - 1;
+ }
+ break;
+ case LT_EQ:
+ if (value < maxTimestamp) {
+ maxTimestamp = value;
+ }
+ break;
+ case GT:
+ if (value + 1 > minTimestamp) {
+ minTimestamp = value + 1;
+ }
+ break;
+ case GT_EQ:
+ if (value > minTimestamp) {
+ minTimestamp = value;
+ }
+ break;
+ case EQ:
+ if (value < maxTimestamp) {
+ maxTimestamp = value;
+ }
+ if (value > minTimestamp) {
+ minTimestamp = value;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot filter timestamps using predicate: " + pred);
+ }
+ }
+
+ if (maxTimestamp < minTimestamp) {
+ throw new IllegalArgumentException(
+ "No timestamps can match filters: " + Joiner.on(",
").join(timeFilters));
+ }
+
+ return Pair.of(minTimestamp, maxTimestamp);
+ }
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestScanSummary.java
b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java
new file mode 100644
index 0000000..b1b2973
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.netflix.iceberg.ScanSummary.timestampRange;
+import static com.netflix.iceberg.ScanSummary.toMillis;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+
+public class TestScanSummary {
+ @Test
+ public void testTimestampRanges() {
+ long lower = 1542750188523L;
+ long upper = 1542750695131L;
+
+ Assert.assertEquals("Should use inclusive bound",
+ Pair.of(Long.MIN_VALUE, upper),
+ timestampRange(ImmutableList.of(lessThanOrEqual("ts_ms", upper))));
+
+ Assert.assertEquals("Should use lower value for upper bound",
+ Pair.of(Long.MIN_VALUE, upper),
+ timestampRange(ImmutableList.of(
+ lessThanOrEqual("ts_ms", upper + 918234),
+ lessThanOrEqual("ts_ms", upper))));
+
+ Assert.assertEquals("Should make upper bound inclusive",
+ Pair.of(Long.MIN_VALUE, upper - 1),
+ timestampRange(ImmutableList.of(lessThan("ts_ms", upper))));
+
+ Assert.assertEquals("Should use inclusive bound",
+ Pair.of(lower, Long.MAX_VALUE),
+ timestampRange(ImmutableList.of(greaterThanOrEqual("ts_ms", lower))));
+
+ Assert.assertEquals("Should use upper value for lower bound",
+ Pair.of(lower, Long.MAX_VALUE),
+ timestampRange(ImmutableList.of(
+ greaterThanOrEqual("ts_ms", lower - 918234),
+ greaterThanOrEqual("ts_ms", lower))));
+
+ Assert.assertEquals("Should make lower bound inclusive",
+ Pair.of(lower + 1, Long.MAX_VALUE),
+ timestampRange(ImmutableList.of(greaterThan("ts_ms", lower))));
+
+ Assert.assertEquals("Should set both bounds for equals",
+ Pair.of(lower, lower),
+ timestampRange(ImmutableList.of(equal("ts_ms", lower))));
+
+ Assert.assertEquals("Should set both bounds",
+ Pair.of(lower, upper - 1),
+ timestampRange(ImmutableList.of(
+ greaterThanOrEqual("ts_ms", lower),
+ lessThan("ts_ms", upper))));
+
+ // >= lower and < lower is an empty range
+ AssertHelpers.assertThrows("Should reject empty ranges",
+ IllegalArgumentException.class, "No timestamps can match filters",
+ () -> timestampRange(ImmutableList.of(
+ greaterThanOrEqual("ts_ms", lower),
+ lessThan("ts_ms", lower))));
+ }
+
+ @Test
+ public void testToMillis() {
+ long millis = 1542750947417L;
+ Assert.assertEquals(1542750947000L, toMillis(millis / 1000));
+ Assert.assertEquals(1542750947417L, toMillis(millis));
+ Assert.assertEquals(1542750947417L, toMillis(millis * 1000 + 918));
+ }
+}