This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d687f96  Spark: Add snapshot selection options to reads (#61)
d687f96 is described below

commit d687f96beb4e0b523ae86d814a720fdbc0299b4b
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jun 10 10:36:27 2019 -0700

    Spark: Add snapshot selection options to reads (#61)
---
 .../apache/iceberg/spark/source/IcebergSource.java |   2 +-
 .../org/apache/iceberg/spark/source/Reader.java    |  19 +-
 .../spark/source/TestSnapshotSelection.java        | 237 +++++++++++++++++++++
 3 files changed, 256 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 097184a..b3a5fc3 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -61,7 +61,7 @@ public class IcebergSource implements DataSourceV2, 
ReadSupport, WriteSupport, D
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     String caseSensitive = 
lazySparkSession().conf().get("spark.sql.caseSensitive", "true");
 
-    return new Reader(table, Boolean.valueOf(caseSensitive));
+    return new Reader(table, Boolean.valueOf(caseSensitive), options);
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 63a33f9..257b876 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -68,6 +68,7 @@ import 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
@@ -99,6 +100,8 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
   private static final Filter[] NO_FILTERS = new Filter[0];
 
   private final Table table;
+  private final Long snapshotId;
+  private final Long asOfTimestamp;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
   private final boolean caseSensitive;
@@ -111,8 +114,14 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
   private StructType type = null; // cached because Spark accesses it multiple 
times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  Reader(Table table, boolean caseSensitive) {
+  Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
     this.table = table;
+    this.snapshotId = 
options.get("snapshot-id").map(Long::parseLong).orElse(null);
+    this.asOfTimestamp = 
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+    if (snapshotId != null && asOfTimestamp != null) {
+      throw new IllegalArgumentException(
+          "Cannot scan using both snapshot-id and as-of-timestamp to select 
the table snapshot");
+    }
     this.schema = table.schema();
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
@@ -219,6 +228,14 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
           .caseSensitive(caseSensitive)
           .project(lazySchema());
 
+      if (snapshotId != null) {
+        scan = scan.useSnapshot(snapshotId);
+      }
+
+      if (asOfTimestamp != null) {
+        scan = scan.asOfTime(asOfTimestamp);
+      }
+
       if (filterExpressions != null) {
         for (Expression filter : filterExpressions) {
           scan = scan.filter(filter);
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
new file mode 100644
index 0000000..cbd9b1d
--- /dev/null
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestSnapshotSelection {
+
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startSpark() {
+    TestSnapshotSelection.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession spark = TestSnapshotSelection.spark;
+    TestSnapshotSelection.spark = null;
+    spark.stop();
+  }
+
+  @Test
+  public void testSnapshotSelectionById() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, 
SimpleRecord.class);
+    firstDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    // produce the second snapshot
+    List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e"),
+        new SimpleRecord(6, "f")
+    );
+    Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, 
SimpleRecord.class);
+    secondDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    Assert.assertEquals("Expected 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    // verify records in the current snapshot
+    Dataset<Row> currentSnapshotResult = spark.read()
+        .format("iceberg")
+        .load(tableLocation);
+    List<SimpleRecord> currentSnapshotRecords = 
currentSnapshotResult.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    List<SimpleRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(firstBatchRecords);
+    expectedRecords.addAll(secondBatchRecords);
+    Assert.assertEquals("Current snapshot rows should match", expectedRecords, 
currentSnapshotRecords);
+
+    // verify records in the previous snapshot
+    Snapshot currentSnapshot = table.currentSnapshot();
+    Long parentSnapshotId = currentSnapshot.parentId();
+    Dataset<Row> previousSnapshotResult = spark.read()
+        .format("iceberg")
+        .option("snapshot-id", parentSnapshotId)
+        .load(tableLocation);
+    List<SimpleRecord> previousSnapshotRecords = 
previousSnapshotResult.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Previous snapshot rows should match", 
firstBatchRecords, previousSnapshotRecords);
+  }
+
+  @Test
+  public void testSnapshotSelectionByTimestamp() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, 
SimpleRecord.class);
+    firstDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    // remember the time when the first snapshot was valid
+    long firstSnapshotTimestamp = System.currentTimeMillis();
+
+    // produce the second snapshot
+    List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e"),
+        new SimpleRecord(6, "f")
+    );
+    Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, 
SimpleRecord.class);
+    secondDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    Assert.assertEquals("Expected 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    // verify records in the current snapshot
+    Dataset<Row> currentSnapshotResult = spark.read()
+        .format("iceberg")
+        .load(tableLocation);
+    List<SimpleRecord> currentSnapshotRecords = 
currentSnapshotResult.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    List<SimpleRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(firstBatchRecords);
+    expectedRecords.addAll(secondBatchRecords);
+    Assert.assertEquals("Current snapshot rows should match", expectedRecords, 
currentSnapshotRecords);
+
+    // verify records in the previous snapshot
+    Dataset<Row> previousSnapshotResult = spark.read()
+        .format("iceberg")
+        .option("as-of-timestamp", firstSnapshotTimestamp)
+        .load(tableLocation);
+    List<SimpleRecord> previousSnapshotRecords = 
previousSnapshotResult.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Previous snapshot rows should match", 
firstBatchRecords, previousSnapshotRecords);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSnapshotSelectionByInvalidSnapshotId() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    tables.create(SCHEMA, spec, tableLocation);
+
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .option("snapshot-id", -10)
+        .load(tableLocation);
+
+    df.collectAsList();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSnapshotSelectionByInvalidTimestamp() throws IOException {
+    long timestamp = System.currentTimeMillis();
+
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    tables.create(SCHEMA, spec, tableLocation);
+
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .option("as-of-timestamp", timestamp)
+        .load(tableLocation);
+
+    df.collectAsList();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws 
IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, 
SimpleRecord.class);
+    firstDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    long timestamp = System.currentTimeMillis();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .option("snapshot-id", snapshotId)
+        .option("as-of-timestamp", timestamp)
+        .load(tableLocation);
+
+    df.collectAsList();
+  }
+}

Reply via email to