This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d687f96 Spark: Add snapshot selection options to reads (#61)
d687f96 is described below
commit d687f96beb4e0b523ae86d814a720fdbc0299b4b
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jun 10 10:36:27 2019 -0700
Spark: Add snapshot selection options to reads (#61)
---
.../apache/iceberg/spark/source/IcebergSource.java | 2 +-
.../org/apache/iceberg/spark/source/Reader.java | 19 +-
.../spark/source/TestSnapshotSelection.java | 237 +++++++++++++++++++++
3 files changed, 256 insertions(+), 2 deletions(-)
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 097184a..b3a5fc3 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -61,7 +61,7 @@ public class IcebergSource implements DataSourceV2,
ReadSupport, WriteSupport, D
Table table = getTableAndResolveHadoopConfiguration(options, conf);
String caseSensitive =
lazySparkSession().conf().get("spark.sql.caseSensitive", "true");
- return new Reader(table, Boolean.valueOf(caseSensitive));
+ return new Reader(table, Boolean.valueOf(caseSensitive), options);
}
@Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 63a33f9..257b876 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -68,6 +68,7 @@ import
org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
@@ -99,6 +100,8 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
private static final Filter[] NO_FILTERS = new Filter[0];
private final Table table;
+ private final Long snapshotId;
+ private final Long asOfTimestamp;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
private final boolean caseSensitive;
@@ -111,8 +114,14 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
private StructType type = null; // cached because Spark accesses it multiple
times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
- Reader(Table table, boolean caseSensitive) {
+ Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
this.table = table;
+ this.snapshotId =
options.get("snapshot-id").map(Long::parseLong).orElse(null);
+ this.asOfTimestamp =
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+ if (snapshotId != null && asOfTimestamp != null) {
+ throw new IllegalArgumentException(
+ "Cannot scan using both snapshot-id and as-of-timestamp to select
the table snapshot");
+ }
this.schema = table.schema();
this.fileIo = table.io();
this.encryptionManager = table.encryption();
@@ -219,6 +228,14 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
.caseSensitive(caseSensitive)
.project(lazySchema());
+ if (snapshotId != null) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+
+ if (asOfTimestamp != null) {
+ scan = scan.asOfTime(asOfTimestamp);
+ }
+
if (filterExpressions != null) {
for (Expression filter : filterExpressions) {
scan = scan.filter(filter);
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
new file mode 100644
index 0000000..cbd9b1d
--- /dev/null
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestSnapshotSelection {
+
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private static SparkSession spark = null;
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSnapshotSelection.spark =
SparkSession.builder().master("local[2]").getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession spark = TestSnapshotSelection.spark;
+ TestSnapshotSelection.spark = null;
+ spark.stop();
+ }
+
+ @Test
+ public void testSnapshotSelectionById() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ // produce the second snapshot
+ List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e"),
+ new SimpleRecord(6, "f")
+ );
+ Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords,
SimpleRecord.class);
+ secondDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ Assert.assertEquals("Expected 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ // verify records in the current snapshot
+ Dataset<Row> currentSnapshotResult = spark.read()
+ .format("iceberg")
+ .load(tableLocation);
+ List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
+ expectedRecords.addAll(secondBatchRecords);
+ Assert.assertEquals("Current snapshot rows should match", expectedRecords,
currentSnapshotRecords);
+
+ // verify records in the previous snapshot
+ Snapshot currentSnapshot = table.currentSnapshot();
+ Long parentSnapshotId = currentSnapshot.parentId();
+ Dataset<Row> previousSnapshotResult = spark.read()
+ .format("iceberg")
+ .option("snapshot-id", parentSnapshotId)
+ .load(tableLocation);
+ List<SimpleRecord> previousSnapshotRecords =
previousSnapshotResult.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals("Previous snapshot rows should match",
firstBatchRecords, previousSnapshotRecords);
+ }
+
+ @Test
+ public void testSnapshotSelectionByTimestamp() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ // remember the time when the first snapshot was valid
+ long firstSnapshotTimestamp = System.currentTimeMillis();
+
+ // produce the second snapshot
+ List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e"),
+ new SimpleRecord(6, "f")
+ );
+ Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords,
SimpleRecord.class);
+ secondDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ Assert.assertEquals("Expected 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ // verify records in the current snapshot
+ Dataset<Row> currentSnapshotResult = spark.read()
+ .format("iceberg")
+ .load(tableLocation);
+ List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
+ expectedRecords.addAll(secondBatchRecords);
+ Assert.assertEquals("Current snapshot rows should match", expectedRecords,
currentSnapshotRecords);
+
+ // verify records in the previous snapshot
+ Dataset<Row> previousSnapshotResult = spark.read()
+ .format("iceberg")
+ .option("as-of-timestamp", firstSnapshotTimestamp)
+ .load(tableLocation);
+ List<SimpleRecord> previousSnapshotRecords =
previousSnapshotResult.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals("Previous snapshot rows should match",
firstBatchRecords, previousSnapshotRecords);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSnapshotSelectionByInvalidSnapshotId() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ tables.create(SCHEMA, spec, tableLocation);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .option("snapshot-id", -10)
+ .load(tableLocation);
+
+ df.collectAsList();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSnapshotSelectionByInvalidTimestamp() throws IOException {
+ long timestamp = System.currentTimeMillis();
+
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ tables.create(SCHEMA, spec, tableLocation);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .option("as-of-timestamp", timestamp)
+ .load(tableLocation);
+
+ df.collectAsList();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws
IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ long timestamp = System.currentTimeMillis();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .option("snapshot-id", snapshotId)
+ .option("as-of-timestamp", timestamp)
+ .load(tableLocation);
+
+ df.collectAsList();
+ }
+}