This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 634e149 Add Java code examples and update site docs (#678)
634e149 is described below
commit 634e14972f25666b22af9cdab1718eef2831e9e4
Author: cmathiesen <[email protected]>
AuthorDate: Thu Apr 9 04:20:44 2020 +0100
Add Java code examples and update site docs (#678)
---
site/docs/api-quickstart.md | 2 +-
site/docs/evolution.md | 5 +
site/docs/getting-started.md | 63 ++++++-
site/docs/img/partition-spec-evolution.png | Bin 0 -> 224020 bytes
.../{api-quickstart.md => java-api-quickstart.md} | 91 +++++-----
site/mkdocs.yml | 11 +-
.../apache/iceberg/examples/ConcurrencyTest.java | 127 ++++++++++++++
.../java/org/apache/iceberg/examples/README.md | 194 +++++++++++++++++++++
.../iceberg/examples/ReadAndWriteTablesTest.java | 164 +++++++++++++++++
.../iceberg/examples/SchemaEvolutionTest.java | 173 ++++++++++++++++++
.../org/apache/iceberg/examples/SimpleRecord.java | 80 +++++++++
.../examples/SnapshotFunctionalityTest.java | 153 ++++++++++++++++
spark/src/test/resources/data/books.json | 6 +
spark/src/test/resources/data/new-books.json | 4 +
14 files changed, 1018 insertions(+), 55 deletions(-)
diff --git a/site/docs/api-quickstart.md b/site/docs/api-quickstart.md
index 4e369d3..00f7f35 100644
--- a/site/docs/api-quickstart.md
+++ b/site/docs/api-quickstart.md
@@ -15,7 +15,7 @@
- limitations under the License.
-->
-# API Quickstart
+# Spark API Quickstart
## Create a table
diff --git a/site/docs/evolution.md b/site/docs/evolution.md
index 37066ef..343b77d 100644
--- a/site/docs/evolution.md
+++ b/site/docs/evolution.md
@@ -54,6 +54,11 @@ Iceberg uses unique IDs to track each column in a table.
When you add a column,
Iceberg table partitioning can be updated in an existing table because queries
do not reference partition values directly.
+When you evolve a partition spec, the old data written with an earlier spec
remains unchanged. New data is written using the new spec in a new layout.
Metadata for each of the partition versions is kept separately. Because of
this, when you start writing queries, you get split planning. This is where
each partition layout plans files separately using the filter it derives for
that specific partition layout. Here's a visual representation of a contrived
example:
+
+
+*The data for 2008 is partitioned by month. Starting from 2009 the table is
updated so that the data is instead partitioned by day. Both partitioning
layouts are able to coexist in the same table.*
+
Iceberg uses [hidden partitioning](../partitioning), so you don't *need* to
write queries for a specific partition layout to be fast. Instead, you can
write queries that select the data you need, and Iceberg automatically prunes
out files that don't contain matching data.
Partition evolution is a metadata operation and does not eagerly rewrite files.
diff --git a/site/docs/getting-started.md b/site/docs/getting-started.md
index 12db7ba..4fe55ee 100644
--- a/site/docs/getting-started.md
+++ b/site/docs/getting-started.md
@@ -15,4 +15,65 @@
- limitations under the License.
-->
-## Getting Started
+# Getting Started
+
+## Including Iceberg
+
+### Downloads
+
+The latest version of Iceberg is
[0.7.0-incubating](https://github.com/apache/incubator-iceberg/releases/tag/apache-iceberg-0.7.0-incubating).
+
+* [0.7.0-incubating source
tar.gz](https://www.apache.org/dyn/closer.cgi/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz)
--
[signature](https://dist.apache.org/repos/dist/release/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz.asc)
--
[sha512](https://dist.apache.org/repos/dist/release/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz.sha512)
+* [0.7.0-incubating Spark 2.4 runtime
Jar](https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime/0.7.0-incubating/iceberg-spark-runtime-0.7.0-incubating.jar)
+
+One way to use Iceberg in Spark 2.4 is to download the runtime Jar and add it
to the jars folder of your Spark install.
+
+Spark 2.4 is limited to reading and writing existing Iceberg tables. Use the
[Iceberg API](../api) to create Iceberg tables.
+
+The recommended way is to include Iceberg's latest release using the
`--packages` option:
+```sh
+spark-shell --packages
org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating
+```
+
+You can also build Iceberg locally, and add the jar to Spark's classpath. This
can be helpful to test unreleased features or while developing something new:
+
+```sh
+./gradlew assemble
+
+spark-shell --jars spark-runtime/build/libs/iceberg-spark-runtime-93990904.jar
+```
+
+Where you have to replace `93990904` with the git hash that you're using.
+
+### Gradle
+To add a dependency on Iceberg in Gradle, add the following to `build.gradle`:
+```
+dependencies {
+ compile 'org.apache.iceberg:iceberg-core:0.7.0-incubating'
+}
+```
+
+### Maven
+If you'd like to try out Iceberg in a Maven project using the Spark Iceberg
API, you can add the `iceberg-spark-runtime` dependency to your `pom.xml` file:
+```xml
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-spark-runtime</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+```
+
+You'll also need `spark-sql` to read tables:
+```xml
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.4.4</version>
+ </dependency>
+```
+
+### Using the API
+For examples on how to use the Iceberg API see:
+
+- [Spark](api-quickstart.md)
+- [Java](java-api-quickstart.md)
diff --git a/site/docs/img/partition-spec-evolution.png
b/site/docs/img/partition-spec-evolution.png
new file mode 100644
index 0000000..0bc595f
Binary files /dev/null and b/site/docs/img/partition-spec-evolution.png differ
diff --git a/site/docs/api-quickstart.md b/site/docs/java-api-quickstart.md
similarity index 66%
copy from site/docs/api-quickstart.md
copy to site/docs/java-api-quickstart.md
index 4e369d3..8bcf080 100644
--- a/site/docs/api-quickstart.md
+++ b/site/docs/java-api-quickstart.md
@@ -15,7 +15,7 @@
- limitations under the License.
-->
-# API Quickstart
+# Java API Quickstart
## Create a table
@@ -25,47 +25,41 @@ Tables are created using either a
[`Catalog`](/javadoc/master/index.html?org/apa
The Hive catalog connects to a Hive MetaStore to keep track of Iceberg tables.
This example uses Spark's Hadoop configuration to get a Hive catalog:
-```scala
-import org.apache.iceberg.hive.HiveCatalog
+```java
+import org.apache.iceberg.hive.HiveCatalog;
-val catalog = new HiveCatalog(spark.sessionState.newHadoopConf())
+Catalog catalog = new HiveCatalog(spark.sparkContext().hadoopConfiguration());
```
The `Catalog` interface defines methods for working with tables, like
`createTable`, `loadTable`, `renameTable`, and `dropTable`.
To create a table, pass an `Identifier` and a `Schema` along with other
initial metadata:
-```scala
-val name = TableIdentifier.of("logging", "logs")
-val table = catalog.createTable(name, schema, spec)
+```java
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
-// write into the new logs table with Spark 2.4
-logsDF.write
- .format("iceberg")
- .mode("append")
- .save("logging.logs")
+TableIdentifier name = TableIdentifier.of("logging", "logs");
+Table table = catalog.createTable(name, schema, spec);
```
The logs [schema](#create-a-schema) and [partition
spec](#create-a-partition-spec) are created below.
+
### Using Hadoop tables
Iceberg also supports tables that are stored in a directory in HDFS or the
local file system. Directory tables don't support all catalog operations, like
rename, so they use the `Tables` interface instead of `Catalog`.
To create a table in HDFS, use `HadoopTables`:
-```scala
-import org.apache.iceberg.hadoop.HadoopTables
-
-val tables = new HadoopTables(spark.sessionState.newHadoopConf())
-
-val table = tables.create(schema, spec, "hdfs:/tables/logging/logs")
+```java
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.Table;
-// write into the new logs table with Spark 2.4
-logsDF.write
- .format("iceberg")
- .mode("append")
- .save("hdfs:/tables/logging/logs")
+Configuration conf = new Configuration():
+HadoopTables tables = new HadoopTables(conf);
+Table table = tables.create(schema, spec, table_location);
```
!!! Warning
@@ -88,16 +82,16 @@ To read and write to tables from Spark see:
This example creates a schema for a `logs` table:
-```scala
-import org.apache.iceberg.Schema
-import org.apache.iceberg.types.Types._
+```java
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
-val schema = new Schema(
- NestedField.required(1, "level", StringType.get()),
- NestedField.required(2, "event_time", TimestampType.withZone()),
- NestedField.required(3, "message", StringType.get()),
- NestedField.optional(4, "call_stack", ListType.ofRequired(5,
StringType.get()))
- )
+Schema schema = new Schema(
+ Types.NestedField.required(1, "level", Types.StringType.get()),
+ Types.NestedField.required(2, "event_time",
Types.TimestampType.withZone()),
+ Types.NestedField.required(3, "message", Types.StringType.get()),
+ Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5,
Types.StringType.get()))
+ );
```
When using the Iceberg API directly, type IDs are required. Conversions from
other schema formats, like Spark, Avro, and Parquet will automatically assign
new IDs.
@@ -108,26 +102,25 @@ When a table is created, all IDs in the schema are
re-assigned to ensure uniquen
To create an Iceberg schema from an existing Avro schema, use converters in
`AvroSchemaUtil`:
-```scala
-import org.apache.avro.Schema.Parser
-import org.apache.iceberg.avro.AvroSchemaUtil
+```java
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.iceberg.avro.AvroSchemaUtil;
-val avroSchema = new Parser().parse("""{"type": "record", ... }""")
-
-val icebergSchema = AvroSchemaUtil.toIceberg(avroSchema)
+Schema avroSchema = new Parser().parse("{\"type\": \"record\" , ... }");
+Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
```
### Convert a schema from Spark
To create an Iceberg schema from an existing table, use converters in
`SparkSchemaUtil`:
-```scala
-import org.apache.iceberg.spark.SparkSchemaUtil
+```java
+import org.apache.iceberg.spark.SparkSchemaUtil;
-val schema = SparkSchemaUtil.convert(spark.table("db.table").schema)
+Schema schema = SparkSchemaUtil.schemaForTable(sparkSession, table_name);
```
-
## Partitioning
### Create a partition spec
@@ -136,11 +129,13 @@ Partition specs describe how Iceberg should group records
into data files. Parti
This example creates a partition spec for the `logs` table that partitions
records by the hour of the log event's timestamp and by log level:
-```scala
-import org.apache.iceberg.PartitionSpec
+```java
+import org.apache.iceberg.PartitionSpec;
-val spec = PartitionSpec.builderFor(schema)
- .hour("event_time")
- .identity("level")
- .build()
+PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .hour("event_time")
+ .identity("level")
+ .build();
```
+
+For more information on the different partition transforms that Iceberg
offers, visit [this page](../spec#partitioning).
diff --git a/site/mkdocs.yml b/site/mkdocs.yml
index 9c4a6e6..68ad141 100644
--- a/site/mkdocs.yml
+++ b/site/mkdocs.yml
@@ -46,26 +46,27 @@ nav:
- Disclaimer: disclaimer.md
- How to Release: how-to-release.md
- User docs:
+ - Getting Started: getting-started.md
+ - Configuration: configuration.md
- Schemas: schemas.md
- Partitioning: partitioning.md
- - Configuration: configuration.md
- Performance: performance.md
- Reliability: reliability.md
- Table evolution: evolution.md
- - Time travel: spark#time-travel
- - Quickstart: api-quickstart.md
+ - Time Travel: spark#time-travel
+ - Spark Quickstart: api-quickstart.md
- Spark: spark.md
- Presto: presto.md
- Java:
- Git Repo: https://github.com/apache/incubator-iceberg
- - Quickstart: api-quickstart.md
+ - Quickstart: java-api-quickstart.md
- API intro: api.md
- Javadoc: /javadoc/
- Custom Catalog: custom-catalog.md
- Python:
- Git Repo: https://github.com/apache/incubator-iceberg/tree/master/python
- Quickstart: python-quickstart.md
- - API intro: python-api-intro.md
+ - API Intro: python-api-intro.md
- Feature Support: python-feature-support.md
- Format:
- Definitions: terms.md
diff --git
a/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
b/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
new file mode 100644
index 0000000..4c1530c
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests how Iceberg handles concurrency when reading and writing
at the same time
+ */
+public class ConcurrencyTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConcurrencyTest.class);
+
+ private Schema schema = new Schema(
+ optional(1, "key", Types.LongType.get()),
+ optional(2, "value", Types.StringType.get())
+ );
+ private SparkSession spark;
+ private File tableLocation;
+ private Table table;
+
+ private List<SimpleRecord> data = new ArrayList<>();
+
+ @Before
+ public void before() throws IOException {
+ tableLocation = Files.createTempDirectory("temp").toFile();
+
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark.sparkContext().setLogLevel("WARN");
+
+ HadoopTables tables = new
HadoopTables(spark.sparkContext().hadoopConfiguration());
+ table = tables.create(schema, tableLocation.toString());
+
+ for (int i = 0; i < 1000000; i++) {
+ data.add(new SimpleRecord(1, "bdp"));
+ }
+
+ log.info("End of setup phase");
+ }
+
+ /**
+ * The test creates 500 read tasks and one really long write (writing 1 mil
rows)
+ * and uses threading to call the tasks concurrently.
+ */
+ @Test
+ public void writingAndReadingConcurrently() throws InterruptedException {
+ ExecutorService threadPool = Executors.newFixedThreadPool(5);
+ List<Callable<Void>> tasks = new ArrayList<>();
+
+ Callable<Void> write = () -> writeToTable(data);
+ tasks.add(write);
+
+ for (int i = 0; i < 500; i++) {
+ Callable<Void> getReads = () -> readTable();
+ tasks.add(getReads);
+ }
+
+ threadPool.invokeAll(tasks);
+ threadPool.shutdown();
+
+ table.refresh();
+ readTable();
+ }
+
+ private Void readTable() {
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(tableLocation.toString());
+
+ log.info("" + results.count());
+ return null;
+ }
+ private Void writeToTable(List<SimpleRecord> writeData) {
+ log.info("WRITING!");
+ Dataset<Row> df = spark.createDataFrame(writeData, SimpleRecord.class);
+ df.select("key", "value").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+ return null;
+ }
+
+ @After
+ public void after() throws IOException {
+ spark.stop();
+ FileUtils.deleteDirectory(tableLocation);
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/README.md
b/spark/src/test/java/org/apache/iceberg/examples/README.md
new file mode 100644
index 0000000..6bab285
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/README.md
@@ -0,0 +1,194 @@
+# Iceberg Java API Examples (with Spark)
+
+## About
+Welcome! :smile:
+
+If you've stumbled across this module, hopefully you're looking for some
guidance on how to get started with the [Apache
Iceberg](https://iceberg.apache.org/) table format. This set of classes
collects code examples of how to use the Iceberg Java API with Spark, along
with some extra detail here in the README.
+
+The examples are structured as JUnit tests that you can download and run
locally if you want to mess around with Iceberg yourself.
+
+## Using Iceberg
+### Maven
+If you'd like to try out Iceberg in your own project using Spark, you can use
the `iceberg-spark-runtime` dependency:
+```xml
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-spark-runtime</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+```
+
+You'll also need `spark-sql`:
+```xml
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.4.4</version>
+ </dependency>
+```
+
+### Gradle
+To add a dependency on Iceberg in Gradle, add the following to `build.gradle`:
+```
+dependencies {
+ compile 'org.apache.iceberg:iceberg-core:0.7.0-incubating'
+}
+```
+
+## Key features investigated
+The following section will break down the different areas of Iceberg explored
in the examples, with links to the code and extra information that could be
useful for new users.
+
+### Writing data to tables
+There are multiple ways of creating tables with Iceberg, including using the
Hive Metastore to keep track of tables
([HiveCatalog](https://iceberg.apache.org/api-quickstart/#using-a-hive-catalog)),
or using HDFS / your local file system
([HadoopTables](https://iceberg.incubator.apache.org/api-quickstart/#using-hadoop-tables))
to store the tables. However, it should be noted that directory tables (such
as those using `HadoopTables`) don’t support all catalog operations, like
rename and t [...]
+It should be noted that `HadoopTables` _shouldn’t_ be used with file systems
that do not support atomic rename as Iceberg depends on this to synchronize
concurrent commits.
+To limit complexity, these examples create tables on your local file system
using the `HadoopTables` class.
+
+To create an Iceberg `Table` you will need to use the Iceberg API to create a
`Schema` and `PartitionSpec` which you use with a Spark `DataFrameWriter`.
+
+Code examples can be found [here](ReadAndWriteTablesTest.java).
+
+#### A quick look at file structures
+It could be interesting to note that when writing partitioned data, Iceberg
will layout your files in a similar manner to Hive:
+
+```
+├── data
+│ ├── published_month=2017-09
+│ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00002.parquet
+│ ├── published_month=2018-09
+│ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00001.parquet
+│ ├── published_month=2018-11
+│ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00000.parquet
+│ └── published_month=null
+│ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00003.parquet
+└── metadata
+ └── version-hint.text
+```
+**WARNING**
+It should be noted that it is not possible to just drag-and-drop data files
into an Iceberg table like the one shown above and expect to see your data in
the table.
+Each file is tracked individually and is managed by Iceberg, and so must be
written into the table using the Iceberg API.
+
+### Reading data from tables
+Reading Iceberg tables is fairly simple using the Spark `DataFrameReader`.
+
+Code examples can be found [here](ReadAndWriteTablesTest.java).
+
+### A look at the metadata
+This section looks a little bit closer at the metadata produced by Iceberg
tables. Consider an example where you've written some data to a table. Your
files will look something like this:
+
+```
+├── data
+│ └── ...
+└── metadata
+ ├── 51accd1d-39c7-4a6e-8f35-9e05f7c67864-m0.avro
+ ├── snap-1335014336004891572-1-51accd1d-39c7-4a6e-8f35-9e05f7c67864.avro
+ ├── v1.metadata.json
+ ├── v2.metadata.json
+ └── version-hint.text
+```
+
+The metadata for your table is kept in json files and each commit to a table
will produce a new metadata file. For tables using a metastore for the
metadata, the file used is whichever file the metastore points at. For
`HadoopTables`, the file used will be the latest version available. Look
[here](https://iceberg.incubator.apache.org/spec/#table-metadata) for more
information on metadata.
+
+The metadata file will contain things like the table location, the schema and
the partition spec:
+
+```json
+{
+ "format-version" : 1,
+ "table-uuid" : "f31aa6d7-acc3-4365-b737-4ef028a60bc1",
+ "location" :
"/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214",
+ "last-updated-ms" : 1572972868185,
+ "last-column-id" : 2,
+ "schema" : {
+ "type" : "struct",
+ "fields" : [ {
+ ...
+ } ]
+ },
+ "partition-spec" : [ {
+ ...
+ } ],
+ "default-spec-id" : 0,
+ "partition-specs" : [ {
+ ...
+ } ]
+ } ],
+ "properties" : { },
+ "current-snapshot-id" : -1,
+ "snapshots" : [ ],
+ "snapshot-log" : [ ]
+}
+```
+
+When you then add your first chunk of data, you get a new version of the
metadata (`v2.metadata.json`) that is the same as the first version except for
the snapshot section at the bottom, which gets updated to:
+
+```json
+"current-snapshot-id" : 8405273199394950821,
+ "snapshots" : [ {
+ "snapshot-id" : 8405273199394950821,
+ "timestamp-ms" : 1572972873293,
+ "summary" : {
+ "operation" : "append",
+ "spark.app.id" : "local-1572972867758",
+ "added-data-files" : "4",
+ "added-records" : "4",
+ "changed-partition-count" : "4",
+ "total-records" : "4",
+ "total-data-files" : "4"
+ },
+ "manifest-list" :
"/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214/metadata/snap-8405273199394950821-1-5706fc75-31e1-404e-aa23-b493387e2e32.avro"
+ } ],
+ "snapshot-log" : [ {
+ "timestamp-ms" : 1572972873293,
+ "snapshot-id" : 8405273199394950821
+ } ]
+```
+
+Here you get information on the data you have just written to the table, such
as `added-records` and `added-data-files` as well as where the manifest list is
located.
+
+
+### Snapshot based functionality
+Iceberg uses [snapshots](https://iceberg.apache.org/terms/#snapshot) as part
of its implementation, and provides a lot of useful functionality from this,
such as **time travel**.
+
+- Iceberg creates a new snapshot for all table operations that modify the
table, such as appends and overwrites.
+- You are able to access the whole list of snapshots generated for a table.
+- Iceberg will store all snapshots generated until you delete the snapshots
using the `ExpireSnapshots` API. Currently, this must be called by the user.
+ - **NOTE**: A VACUUM operation with Spark is in the works for a future
release to make this process easier.
+ - You can delete all snapshots earlier than a certain timestamp.
+ - You can delete snaphots based on `SnapshotID` values.
+- You can read data from an old snapshot using the `SnapshotID` or a timestamp
value ([time travel](https://iceberg.apache.org/spark/#time-travel)).
+- You can roll back your data to an earlier snapshot.
+
+Code examples can be found [here](SnapshotFunctionalityTest.java).
+
+### Table schema evolution
+Iceberg provides support to handle schema evolution of your tables over time:
+
+1. Add a new column
+ 1. The new column is always added at the end of the table (**NOTE**: This
will be fixed with Spark 3.0 which has implemented AFTER and FIRST operations).
+ 1. You are only able to add a column at the end of the schema, not
somewhere in the middle.
+ 1. Any rows using the earlier schema return a `null` value for this new
column. You cannot use an alternative default value.
+ 1. This column automatically becomes an `optional` column, meaning adding
data to this column isn't enforced for each future write.
+1. Delete a column
+ 1. When you delete a column, that column will no longer be available in
any of your previous snapshots. So, use this with caution :sweat_smile:
+1. Update a column
+ 1. Certain type promotions can be made (such as `int` -> `long`). For a
definitive list, see the [official
documentation](https://iceberg.apache.org/spec/#schemas-and-data-types).
+1. Rename a column
+ 1. When you rename a column, it will appear renamed in all earlier
versions of snapshots.
+
+Code examples can be found [here](SchemaEvolutionTest.java).
+
+### Optimistic concurrency
+[Optimistic
concurrency](https://en.wikipedia.org/wiki/Optimistic_concurrency_control) is
when a system assumes that multiple writers can write to the same table without
interfering with each other. This is usually used in environments where there
is low data contention. It means that locking of the table isn't used, allowing
multiple writers to write to the table at the same time.
+
+However, this means you need to occasionally deal with concurrent writer
conflicts. This is when multiple writers start writing to a table at the same
time, but one finishes first and commits an update. Then when the second writer
tries to commit it has to throw an error because the table isn't in the same
state as it was when it started writing.
+
+Iceberg deals with this by attempting retries of the write based on the new
metadata. This can happen if the files the first write changed aren't touched
by the second write, then it's deemed safe to commit the second update.
+
+[This test](ConcurrencyTest.java) looks to experiment with how optimistic
concurrency works. For more information on conflict resolution, look
[here](https://iceberg.incubator.apache.org/spec/#table-metadata) and for
information on write concurrency, look
[here](https://iceberg.incubator.apache.org/reliability/#concurrent-write-operations).
+
+By default, Iceberg has set the `commit.retry.num-retries` property to **4**.
You can edit this default by creating an `UpdateProperties` object and
assigning a new number to that property:
+
+```java
+ table.updateProperties().set("commit.retry.num-retries", "1").commit();
+```
+
+You can find more information on other table properties you can configure
[here](https://iceberg.incubator.apache.org/configuration/#table-properties).
diff --git
a/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
b/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
new file mode 100644
index 0000000..0f49574
--- /dev/null
+++
b/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This test class uses Spark to create partitioned and unpartitioned tables
locally.
+ */
+public class ReadAndWriteTablesTest {
+
+ private SparkSession spark;
+ private Table table;
+ private HadoopTables tables;
+ private File pathToTable;
+ private Schema schema;
+
+ @Before
+ public void before() throws IOException {
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+
+ pathToTable = Files.createTempDirectory("temp").toFile();
+ tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+
+ schema = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ }
+
+ @Test
+ public void createUnpartitionedTable() {
+ table = tables.create(schema, pathToTable.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(pathToTable.toString());
+
+ table.refresh();
+ }
+
+ @Test
+ public void createPartitionedTable() {
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .identity("id")
+ .build();
+
+ table = tables.create(schema, spec, pathToTable.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(pathToTable.toString());
+
+ table.refresh();
+ }
+
+ @Test
+ public void writeDataFromJsonFile() {
+ Schema bookSchema = new Schema(
+ optional(1, "title", Types.StringType.get()),
+ optional(2, "price", Types.LongType.get()),
+ optional(3, "author", Types.StringType.get()),
+ optional(4, "published", Types.TimestampType.withZone()),
+ optional(5, "genre", Types.StringType.get())
+ );
+
+ table = tables.create(bookSchema, pathToTable.toString());
+
+ Dataset<Row> df = spark.read().json("src/test/resources/data/books.json");
+
+ df.select(df.col("title"), df.col("price"), df.col("author"),
+ df.col("published").cast(DataTypes.TimestampType),
df.col("genre")).write()
+ .format("iceberg")
+ .mode("append")
+ .save(pathToTable.toString());
+
+ table.refresh();
+ }
+
+ @Test
+ public void readFromIcebergTableWithSpark() {
+ table = tables.create(schema, pathToTable.toString());
+
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(pathToTable.toString());
+
+ results.createOrReplaceTempView("table");
+ spark.sql("select * from table").show();
+ }
+
+ @Test
+ public void readFromPartitionedTableWithFilter() {
+ table = tables.create(schema, pathToTable.toString());
+
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(pathToTable.toString())
+ .filter("data != \"b\"");
+
+ results.createOrReplaceTempView("table");
+ spark.sql("SELECT * FROM table").show();
+ }
+
+ @After
+ public void after() throws IOException {
+ FileUtils.deleteDirectory(pathToTable);
+ spark.stop();
+ }
+}
diff --git
a/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
b/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
new file mode 100644
index 0000000..f689f53
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests how you can evolve your table schema with Iceberg.
+ * This includes things like adding, deleting, renaming columns and type
promotions.
+ */
+public class SchemaEvolutionTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(SchemaEvolutionTest.class);
+
+ private SparkSession spark;
+ private Table table;
+ private File tableLocation;
+ private String dataLocation = "src/test/resources/data/";
+
+ @Before
+ public void before() throws IOException {
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ tableLocation = Files.createTempDirectory("temp").toFile();
+ Schema schema = new Schema(
+ optional(1, "title", Types.StringType.get()),
+ optional(2, "price", Types.IntegerType.get()),
+ optional(3, "author", Types.StringType.get()),
+ optional(4, "published", Types.TimestampType.withZone()),
+ optional(5, "genre", Types.StringType.get())
+ );
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .year("published")
+ .build();
+
+ HadoopTables tables = new
HadoopTables(spark.sparkContext().hadoopConfiguration());
+ table = tables.create(schema, spec, tableLocation.toString());
+
+ Dataset<Row> df = spark.read().json(dataLocation + "/books.json");
+
+ df.select(df.col("title"), df.col("price").cast(DataTypes.IntegerType),
+ df.col("author"), df.col("published").cast(DataTypes.TimestampType),
+ df.col("genre")).write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+
+ table.refresh();
+ }
+
+ @Test
+ public void addColumnToSchema() {
+ table.updateSchema().addColumn("publisher",
Types.StringType.get()).commit();
+
+ Dataset<Row> df2 = spark.read().json(dataLocation + "new-books.json");
+
+ df2.select(df2.col("title"), df2.col("price").cast(DataTypes.IntegerType),
+ df2.col("author"), df2.col("published").cast(DataTypes.TimestampType),
+ df2.col("genre"), df2.col("publisher")).write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+ }
+
+ @Test
+ public void deleteColumnFromSchema() {
+ table.updateSchema().deleteColumn("genre").commit();
+
+ table.refresh();
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(tableLocation.toString());
+
+ results.createOrReplaceTempView("table");
+ spark.sql("select * from table").show();
+ }
+
+ @Test
+ public void renameColumn() {
+ table.updateSchema().renameColumn("author", "writer").commit();
+
+ table.refresh();
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(tableLocation.toString());
+
+ results.createOrReplaceTempView("table");
+ spark.sql("select * from table").show();
+ }
+
+ @Test
+ public void updateColumnTypeIntToLong() {
+ table.updateSchema().updateColumn("price", Types.LongType.get()).commit();
+
+ log.info("Promote int type to long type:\n" + table.schema().toString());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void updateColumnTypeIntToString() {
+ table.updateSchema().updateColumn("price",
Types.StringType.get()).commit();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void updateColumnTypeStringToInt() {
+ table.updateSchema().updateColumn("author",
Types.IntegerType.get()).commit();
+ }
+
+ @Test
+ public void floatToDouble() throws IOException {
+ // Set up a new table to test this conversion
+ Schema schema = new Schema(optional(1, "float", Types.FloatType.get()));
+ File location = Files.createTempDirectory("temp").toFile();
+ HadoopTables tables = new
HadoopTables(spark.sparkContext().hadoopConfiguration());
+ Table floatTable = tables.create(schema, location.toString());
+
+ floatTable.updateSchema().updateColumn("float",
Types.DoubleType.get()).commit();
+
+ log.info("Promote float type to double type:\n" +
floatTable.schema().toString());
+ }
+
+ @Test
+ public void widenDecimalPrecision() throws IOException {
+ // Set up a new table to test this conversion
+ Schema schema = new Schema(optional(1, "decimal", Types.DecimalType.of(2,
2)));
+ File location = Files.createTempDirectory("temp").toFile();
+ HadoopTables tables = new
HadoopTables(spark.sparkContext().hadoopConfiguration());
+ Table decimalTable = tables.create(schema, location.toString());
+
+ decimalTable.updateSchema().updateColumn("decimal",
Types.DecimalType.of(4, 2)).commit();
+
+ log.info("Widen decimal type:\n" + decimalTable.schema().toString());
+ }
+
+ @Test
+ public void after() throws IOException {
+ spark.stop();
+ FileUtils.deleteDirectory(tableLocation);
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
b/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
new file mode 100644
index 0000000..f7394f3
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.base.Objects;
+
+public class SimpleRecord {
+ private Integer id;
+ private String data;
+
+ public SimpleRecord() {
+ }
+
+ SimpleRecord(Integer id, String data) {
+ this.id = id;
+ this.data = data;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SimpleRecord record = (SimpleRecord) o;
+ return Objects.equal(id, record.id) && Objects.equal(data, record.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id, data);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("{\"id\"=");
+ buffer.append(id);
+ buffer.append(",\"data\"=\"");
+ buffer.append(data);
+ buffer.append("\"}");
+ return buffer.toString();
+ }
+}
diff --git
a/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
b/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
new file mode 100644
index 0000000..4649d9a
--- /dev/null
+++
b/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests the snapshot functionality available with Iceberg.
+ * This includes things like time-travel, rollback and retrieving metadata.
+ */
+public class SnapshotFunctionalityTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(SnapshotFunctionalityTest.class);
+
+ private Table table;
+ private File tableLocation;
+ private SparkSession spark = null;
+
+ @Before
+ public void before() throws IOException {
+ Schema schema = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+
+ tableLocation = Files.createTempDirectory("temp").toFile();
+
+ HadoopTables tables = new
HadoopTables(spark.sparkContext().hadoopConfiguration());
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ table = tables.create(schema, spec, tableLocation.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ for (int i = 0; i < 5; i++) {
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+ }
+ table.refresh();
+ }
+
+ @Test
+ public void rollbackToPreviousSnapshotAndReadData() {
+ long oldId = table.history().get(0).snapshotId();
+
+ table.rollback().toSnapshotId(oldId).commit();
+ table.refresh();
+
+ Dataset<Row> results = spark.read()
+ .format("iceberg")
+ .load(tableLocation.toString());
+
+ results.createOrReplaceTempView("table");
+ spark.sql("select * from table").show();
+ }
+
+ @Test
+ public void expireOldSnapshotWithSnapshotID() {
+ long oldId = table.history().get(0).snapshotId();
+
+ table.expireSnapshots().expireSnapshotId(oldId).commit();
+ table.refresh();
+
+ Iterator<Snapshot> iterator = table.snapshots().iterator();
+ List<Snapshot> snapshots = IteratorUtils.toList(iterator);
+ }
+
+ /**
+ * Expires anything older than a given timestamp, NOT including that
timestamp.
+ */
+ @Test
+ public void retireAllSnapshotsOlderThanTimestamp() {
+ long secondLatestTimestamp = table.history().get(2).timestampMillis();
+ Iterator<Snapshot> beforeIterator = table.snapshots().iterator();
+ List<Snapshot> beforeSnapshots = IteratorUtils.toList(beforeIterator);
+
+ //Delete the 2 oldest snapshots
+ table.expireSnapshots().expireOlderThan(secondLatestTimestamp).commit();
+ table.refresh();
+
+ Iterator<Snapshot> afterIterator = table.snapshots().iterator();
+ List<Snapshot> afterSnapshots = IteratorUtils.toList(afterIterator);
+ }
+
+ @Test
+ public void getInfoAboutFilesAddedFromSnapshot() {
+ Snapshot snapshot = table.currentSnapshot();
+ Iterable<DataFile> addedFiles = snapshot.addedFiles();
+
+ for (DataFile dataFile : addedFiles) {
+ log.info("File path: " + dataFile.path());
+ log.info("File format: " + dataFile.format());
+ log.info("File size in bytes: " + dataFile.fileSizeInBytes());
+ log.info("Record count: " + dataFile.recordCount());
+ }
+ }
+
+ @After
+ public void after() throws IOException {
+ FileUtils.deleteDirectory(tableLocation);
+ spark.stop();
+ }
+}
diff --git a/spark/src/test/resources/data/books.json
b/spark/src/test/resources/data/books.json
new file mode 100644
index 0000000..902b4e3
--- /dev/null
+++ b/spark/src/test/resources/data/books.json
@@ -0,0 +1,6 @@
+{"title":"Gone", "price":12, "author": "Michael Grant", "published":
1541776051, "genre": "fiction"}
+{"title":"Carry On", "price":10, "author": "Rainbow Rowell", "published":
1536505651, "genre": "fiction"}
+{"title":"Warward Son", "price":12, "author": "Rainbow Rowell", "published":
1504969651, "genre": "fiction"}
+{"title":"Heroes", "price":8, "author": "Stephen Fry", "published":
1504969651, "genre": "fiction"}
+{"title":"Vietnam", "price":15, "author": "Max Hastings", "genre":
"non-fiction"}
+
diff --git a/spark/src/test/resources/data/new-books.json
b/spark/src/test/resources/data/new-books.json
new file mode 100644
index 0000000..3418151
--- /dev/null
+++ b/spark/src/test/resources/data/new-books.json
@@ -0,0 +1,4 @@
+{"title":"Harry Potter", "price":12, "author": "JK Rowling", "published":
1570719361, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Percy Jackson", "price":10, "author": "Rick Riordan", "published":
1547132161, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Cookie", "price":8, "author": "Jacqueline Wilson", "published":
1552229761, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Fangirl", "price":12, "author": "Rainbow Rowell", "published":
1552229761, "genre": "fiction", "publisher": "ACME Books"}