This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 880613e  Support dateCreated expressions in ScanSummary. (#2)
880613e is described below

commit 880613e0682ad5de6dc3857ab254bb3f7060cc9d
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Nov 28 15:30:53 2018 -0800

    Support dateCreated expressions in ScanSummary. (#2)
---
 .../main/java/com/netflix/iceberg/ScanSummary.java | 139 +++++++++++++++++++--
 .../java/com/netflix/iceberg/TestScanSummary.java  |  94 ++++++++++++++
 2 files changed, 223 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java 
b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index 7a94486..b19ab38 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -19,17 +19,30 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.expressions.And;
+import com.netflix.iceberg.expressions.Expression;
+import com.netflix.iceberg.expressions.Expression.Operation;
+import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.Literal;
+import com.netflix.iceberg.expressions.NamedReference;
+import com.netflix.iceberg.expressions.UnboundPredicate;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Types;
+import com.netflix.iceberg.util.Pair;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Function;
 
@@ -51,15 +64,15 @@ public class ScanSummary {
   }
 
   public static class Builder {
+    private static final Set<String> TIMESTAMP_NAMES = Sets.newHashSet(
+        "dateCreated", "lastUpdated");
     private final TableScan scan;
     private final Table table;
     private final TableOperations ops;
     private final Map<Long, Long> snapshotTimestamps;
     private int limit = Integer.MAX_VALUE;
     private boolean throwIfLimited = false;
-    private boolean filterByTimestamp = false;
-    private long minTimestamp = 0L;
-    private long maxTimestamp = Long.MAX_VALUE;
+    private List<UnboundPredicate<Long>> timeFilters = Lists.newArrayList();
 
     public Builder(TableScan scan) {
       this.scan = scan;
@@ -72,17 +85,18 @@ public class ScanSummary {
       this.snapshotTimestamps = builder.build();
     }
 
-    public Builder after(long timestampMillis) {
+    private void addTimestampFilter(UnboundPredicate<Long> filter) {
       throwIfLimited(); // ensure all partitions can be returned
-      this.filterByTimestamp = true;
-      this.minTimestamp = timestampMillis;
+      timeFilters.add(filter);
+    }
+
+    public Builder after(long timestampMillis) {
+      addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", 
timestampMillis));
       return this;
     }
 
     public Builder before(long timestampMillis) {
-      throwIfLimited(); // ensure all partitions can be returned
-      this.filterByTimestamp = true;
-      this.maxTimestamp = timestampMillis;
+      addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", 
timestampMillis));
       return this;
     }
 
@@ -96,6 +110,28 @@ public class ScanSummary {
       return this;
     }
 
+    private void removeTimeFilters(List<Expression> expressions, Expression 
expression) {
+      if (expression.op() == Operation.AND) {
+        And and = (And) expression;
+        removeTimeFilters(expressions, and.left());
+        removeTimeFilters(expressions, and.right());
+        return;
+
+      } else if (expression instanceof UnboundPredicate) {
+        UnboundPredicate pred = (UnboundPredicate) expression;
+        NamedReference ref = (NamedReference) pred.ref();
+        Literal<?> lit = pred.literal();
+        if (TIMESTAMP_NAMES.contains(ref.name())) {
+          Literal<Long> tsLiteral = lit.to(Types.TimestampType.withoutZone());
+          long millis = toMillis(tsLiteral.value());
+          addTimestampFilter(Expressions.predicate(pred.op(), "timestamp_ms", 
millis));
+          return;
+        }
+      }
+
+      expressions.add(expression);
+    }
+
     /**
      * Summarizes a table scan as a map of partition key to metrics for that 
partition.
      *
@@ -105,9 +141,22 @@ public class ScanSummary {
       TopN<String, PartitionMetrics> topN = new TopN<>(
           limit, throwIfLimited, Comparators.charSequences());
 
+      List<Expression> filters = Lists.newArrayList();
+      removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
+      Expression rowFilter = joinFilters(filters);
+
+      long minTimestamp = Long.MIN_VALUE;
+      long maxTimestamp = Long.MAX_VALUE;
+      boolean filterByTimestamp = !timeFilters.isEmpty();
+      if (filterByTimestamp) {
+        Pair<Long, Long> range = timestampRange(timeFilters);
+        minTimestamp = range.first();
+        maxTimestamp = range.second();
+      }
+
       try (CloseableIterable<ManifestEntry> entries =
                new ManifestGroup(ops, table.currentSnapshot().manifests())
-                   .filterData(scan.filter())
+                   .filterData(rowFilter)
                    .ignoreDeleted()
                    .select(SCAN_SUMMARY_COLUMNS)
                    .entries()) {
@@ -217,4 +266,74 @@ public class ScanSummary {
       return ImmutableMap.copyOf(map);
     }
   }
+
+  static Expression joinFilters(List<Expression> expressions) {
+    Expression result = Expressions.alwaysTrue();
+    for (Expression expression : expressions) {
+      result = Expressions.and(result, expression);
+    }
+    return result;
+  }
+
+  static long toMillis(long timestamp) {
+    if (timestamp < 10000000000L) {
+      // in seconds
+      return timestamp * 1000;
+    } else if (timestamp < 10000000000000L) {
+      // in millis
+      return timestamp;
+    }
+    // in micros
+    return timestamp / 1000;
+  }
+
+  static Pair<Long, Long> timestampRange(List<UnboundPredicate<Long>> 
timeFilters) {
+    // evaluation is inclusive
+    long minTimestamp = Long.MIN_VALUE;
+    long maxTimestamp = Long.MAX_VALUE;
+
+    for (UnboundPredicate<Long> pred : timeFilters) {
+      long value = pred.literal().value();
+      switch (pred.op()) {
+        case LT:
+          if (value - 1 < maxTimestamp) {
+            maxTimestamp = value - 1;
+          }
+          break;
+        case LT_EQ:
+          if (value < maxTimestamp) {
+            maxTimestamp = value;
+          }
+          break;
+        case GT:
+          if (value + 1 > minTimestamp) {
+            minTimestamp = value + 1;
+          }
+          break;
+        case GT_EQ:
+          if (value > minTimestamp) {
+            minTimestamp = value;
+          }
+          break;
+        case EQ:
+          if (value < maxTimestamp) {
+            maxTimestamp = value;
+          }
+          if (value > minTimestamp) {
+            minTimestamp = value;
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Cannot filter timestamps using predicate: " + pred);
+      }
+    }
+
+    if (maxTimestamp < minTimestamp) {
+      throw new IllegalArgumentException(
+          "No timestamps can match filters: " + Joiner.on(", 
").join(timeFilters));
+    }
+
+    return Pair.of(minTimestamp, maxTimestamp);
+  }
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestScanSummary.java 
b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java
new file mode 100644
index 0000000..b1b2973
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.netflix.iceberg.ScanSummary.timestampRange;
+import static com.netflix.iceberg.ScanSummary.toMillis;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+
+public class TestScanSummary {
+  @Test
+  public void testTimestampRanges() {
+    long lower = 1542750188523L;
+    long upper = 1542750695131L;
+
+    Assert.assertEquals("Should use inclusive bound",
+        Pair.of(Long.MIN_VALUE, upper),
+        timestampRange(ImmutableList.of(lessThanOrEqual("ts_ms", upper))));
+
+    Assert.assertEquals("Should use lower value for upper bound",
+        Pair.of(Long.MIN_VALUE, upper),
+        timestampRange(ImmutableList.of(
+            lessThanOrEqual("ts_ms", upper + 918234),
+            lessThanOrEqual("ts_ms", upper))));
+
+    Assert.assertEquals("Should make upper bound inclusive",
+        Pair.of(Long.MIN_VALUE, upper - 1),
+        timestampRange(ImmutableList.of(lessThan("ts_ms", upper))));
+
+    Assert.assertEquals("Should use inclusive bound",
+        Pair.of(lower, Long.MAX_VALUE),
+        timestampRange(ImmutableList.of(greaterThanOrEqual("ts_ms", lower))));
+
+    Assert.assertEquals("Should use upper value for lower bound",
+        Pair.of(lower, Long.MAX_VALUE),
+        timestampRange(ImmutableList.of(
+            greaterThanOrEqual("ts_ms", lower - 918234),
+            greaterThanOrEqual("ts_ms", lower))));
+
+    Assert.assertEquals("Should make lower bound inclusive",
+        Pair.of(lower + 1, Long.MAX_VALUE),
+        timestampRange(ImmutableList.of(greaterThan("ts_ms", lower))));
+
+    Assert.assertEquals("Should set both bounds for equals",
+        Pair.of(lower, lower),
+        timestampRange(ImmutableList.of(equal("ts_ms", lower))));
+
+    Assert.assertEquals("Should set both bounds",
+        Pair.of(lower, upper - 1),
+        timestampRange(ImmutableList.of(
+            greaterThanOrEqual("ts_ms", lower),
+            lessThan("ts_ms", upper))));
+
+    // >= lower and < lower is an empty range
+    AssertHelpers.assertThrows("Should reject empty ranges",
+        IllegalArgumentException.class, "No timestamps can match filters",
+        () -> timestampRange(ImmutableList.of(
+            greaterThanOrEqual("ts_ms", lower),
+            lessThan("ts_ms", lower))));
+  }
+
+  @Test
+  public void testToMillis() {
+    long millis = 1542750947417L;
+    Assert.assertEquals(1542750947000L, toMillis(millis / 1000));
+    Assert.assertEquals(1542750947417L, toMillis(millis));
+    Assert.assertEquals(1542750947417L, toMillis(millis * 1000 + 918));
+  }
+}

Reply via email to