This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d376b  Core: Add schema-id to snapshots (#2275)
d4d376b is described below

commit d4d376bb559e8dc83da3cb7ecbb8ff9205d68f21
Author: yyanyy <[email protected]>
AuthorDate: Mon Jun 28 17:24:47 2021 -0700

    Core: Add schema-id to snapshots (#2275)
---
 api/src/main/java/org/apache/iceberg/Snapshot.java |   9 ++
 api/src/main/java/org/apache/iceberg/Table.java    |   7 +
 .../test/java/org/apache/iceberg/TestHelpers.java  |  15 +++
 .../java/org/apache/iceberg/BaseMetadataTable.java |   5 +
 .../main/java/org/apache/iceberg/BaseSnapshot.java |  21 ++-
 .../main/java/org/apache/iceberg/BaseTable.java    |   5 +
 .../java/org/apache/iceberg/BaseTransaction.java   |   5 +
 .../java/org/apache/iceberg/SerializableTable.java |   5 +
 .../java/org/apache/iceberg/SnapshotParser.java    |  13 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |   4 +-
 .../java/org/apache/iceberg/TableTestBase.java     |   2 +
 .../test/java/org/apache/iceberg/TestSchemaID.java | 149 +++++++++++++++++++++
 .../java/org/apache/iceberg/TestSnapshotJson.java  |  29 +++-
 .../java/org/apache/iceberg/TestTableMetadata.java |  41 +++---
 core/src/test/resources/TableMetadataV2Valid.json  |  38 +++++-
 15 files changed, 315 insertions(+), 33 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java 
b/api/src/main/java/org/apache/iceberg/Snapshot.java
index 3fd8714..d10a6c3 100644
--- a/api/src/main/java/org/apache/iceberg/Snapshot.java
+++ b/api/src/main/java/org/apache/iceberg/Snapshot.java
@@ -126,4 +126,13 @@ public interface Snapshot extends Serializable {
    * @return the location of the manifest list for this Snapshot
    */
   String manifestListLocation();
+
+  /**
+   * Return the id of the schema used when this snapshot was created, or null 
if this information is not available.
+   *
+   * @return schema id associated with this snapshot
+   */
+  default Integer schemaId() {
+    return null;
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/Table.java 
b/api/src/main/java/org/apache/iceberg/Table.java
index a4557c8..854d68b 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -61,6 +61,13 @@ public interface Table {
   Schema schema();
 
   /**
+   * Return a map of {@link Schema schema} for this table.
+   *
+   * @return this table's schema map
+   */
+  Map<Integer, Schema> schemas();
+
+  /**
    * Return the {@link PartitionSpec partition spec} for this table.
    *
    * @return this table's partition spec
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java 
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index 69b5338..401266b 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -121,6 +121,21 @@ public class TestHelpers {
     Assert.assertEquals("History must match", expected.history(), 
actual.history());
   }
 
+  public static void assertSameSchemaMap(Map<Integer, Schema> map1, 
Map<Integer, Schema> map2) {
+    if (map1.size() != map2.size()) {
+      Assert.fail("Should have same number of schemas in both maps");
+    }
+
+    map1.forEach((schemaId, schema1) -> {
+      Schema schema2 = map2.get(schemaId);
+      Assert.assertNotNull(String.format("Schema ID %s does not exist in map: 
%s", schemaId, map2), schema2);
+
+      Assert.assertEquals("Should have matching schema id", 
schema1.schemaId(), schema2.schemaId());
+      Assert.assertTrue(String.format("Should be the same schema. Schema 1: 
%s, schema 2: %s", schema1, schema2),
+          schema1.sameSchema(schema2));
+    });
+  }
+
   private static class CheckReferencesBound extends 
ExpressionVisitors.ExpressionVisitor<Void> {
     private final String message;
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java 
b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 28a5a59..81ecaef 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -89,6 +89,11 @@ abstract class BaseMetadataTable implements Table, 
HasTableOperations, Serializa
   }
 
   @Override
+  public Map<Integer, Schema> schemas() {
+    return ImmutableMap.of(TableMetadata.INITIAL_SCHEMA_ID, schema());
+  }
+
+  @Override
   public PartitionSpec spec() {
     return spec;
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java 
b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 95e6d62..2641e37 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -43,6 +43,7 @@ class BaseSnapshot implements Snapshot {
   private final String manifestListLocation;
   private final String operation;
   private final Map<String, String> summary;
+  private final Integer schemaId;
 
   // lazily initialized
   private transient List<ManifestFile> allManifests = null;
@@ -56,9 +57,10 @@ class BaseSnapshot implements Snapshot {
    */
   BaseSnapshot(FileIO io,
                long snapshotId,
+               Integer schemaId,
                String... manifestFiles) {
     this(io, snapshotId, null, System.currentTimeMillis(), null, null,
-        Lists.transform(Arrays.asList(manifestFiles),
+        schemaId, Lists.transform(Arrays.asList(manifestFiles),
             path -> new GenericManifestFile(io.newInputFile(path), 0)));
   }
 
@@ -69,6 +71,7 @@ class BaseSnapshot implements Snapshot {
                long timestampMillis,
                String operation,
                Map<String, String> summary,
+               Integer schemaId,
                String manifestList) {
     this.io = io;
     this.sequenceNumber = sequenceNumber;
@@ -77,6 +80,7 @@ class BaseSnapshot implements Snapshot {
     this.timestampMillis = timestampMillis;
     this.operation = operation;
     this.summary = summary;
+    this.schemaId = schemaId;
     this.manifestListLocation = manifestList;
   }
 
@@ -86,8 +90,9 @@ class BaseSnapshot implements Snapshot {
                long timestampMillis,
                String operation,
                Map<String, String> summary,
+               Integer schemaId,
                List<ManifestFile> dataManifests) {
-    this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, 
operation, summary, null);
+    this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, 
operation, summary, schemaId, null);
     this.allManifests = dataManifests;
   }
 
@@ -121,6 +126,11 @@ class BaseSnapshot implements Snapshot {
     return summary;
   }
 
+  @Override
+  public Integer schemaId() {
+    return schemaId;
+  }
+
   private void cacheManifests() {
     if (allManifests == null) {
       // if manifests isn't set, then the snapshotFile is set and should be 
read to get the list
@@ -222,7 +232,8 @@ class BaseSnapshot implements Snapshot {
       return this.snapshotId == other.snapshotId() &&
           Objects.equal(this.parentId, other.parentId()) &&
           this.sequenceNumber == other.sequenceNumber() &&
-          this.timestampMillis == other.timestampMillis();
+          this.timestampMillis == other.timestampMillis() &&
+          Objects.equal(this.schemaId, other.schemaId());
     }
 
     return false;
@@ -234,7 +245,8 @@ class BaseSnapshot implements Snapshot {
       this.snapshotId,
       this.parentId,
       this.sequenceNumber,
-      this.timestampMillis
+      this.timestampMillis,
+      this.schemaId
     );
   }
 
@@ -246,6 +258,7 @@ class BaseSnapshot implements Snapshot {
         .add("operation", operation)
         .add("summary", summary)
         .add("manifest-list", manifestListLocation)
+        .add("schema-id", schemaId)
         .toString();
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index f7e7540..b9886f0 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -70,6 +70,11 @@ public class BaseTable implements Table, HasTableOperations, 
Serializable {
   }
 
   @Override
+  public Map<Integer, Schema> schemas() {
+    return ops.current().schemasById();
+  }
+
+  @Override
   public PartitionSpec spec() {
     return ops.current().spec();
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 36cf53b..8c5d498 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -527,6 +527,11 @@ class BaseTransaction implements Transaction {
     }
 
     @Override
+    public Map<Integer, Schema> schemas() {
+      return current.schemasById();
+    }
+
+    @Override
     public PartitionSpec spec() {
       return current.spec();
     }
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java 
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 8bfd76a..d8f623d 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -162,6 +162,11 @@ public class SerializableTable implements Table, 
Serializable {
   }
 
   @Override
+  public Map<Integer, Schema> schemas() {
+    return lazyTable().schemas();
+  }
+
+  @Override
   public PartitionSpec spec() {
     if (lazySpec == null) {
       synchronized (this) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java 
b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
index cf13c35..2675e96 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
@@ -46,6 +46,7 @@ public class SnapshotParser {
   private static final String OPERATION = "operation";
   private static final String MANIFESTS = "manifests";
   private static final String MANIFEST_LIST = "manifest-list";
+  private static final String SCHEMA_ID = "schema-id";
 
   static void toJson(Snapshot snapshot, JsonGenerator generator)
       throws IOException {
@@ -88,6 +89,11 @@ public class SnapshotParser {
       generator.writeEndArray();
     }
 
+    // schema ID might be null for snapshots written by old writers
+    if (snapshot.schemaId() != null) {
+      generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
+    }
+
     generator.writeEndObject();
   }
 
@@ -139,17 +145,20 @@ public class SnapshotParser {
       summary = builder.build();
     }
 
+    Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);
+
     if (node.has(MANIFEST_LIST)) {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
-      return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, 
timestamp, operation, summary, manifestList);
+      return new BaseSnapshot(
+          io, sequenceNumber, snapshotId, parentId, timestamp, operation, 
summary, schemaId, manifestList);
 
     } else {
       // fall back to an embedded manifest list. pass in the manifest's 
InputFile so length can be
       // loaded lazily, if it is needed
       List<ManifestFile> manifests = 
Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
           location -> new GenericManifestFile(io.newInputFile(location), 0));
-      return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, 
summary, manifests);
+      return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, 
summary, schemaId, manifests);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index c9048f7..7825500 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -188,12 +188,12 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
 
       return new BaseSnapshot(ops.io(),
           sequenceNumber, snapshotId(), parentSnapshotId, 
System.currentTimeMillis(), operation(), summary(base),
-          manifestList.location());
+          base.currentSchemaId(), manifestList.location());
 
     } else {
       return new BaseSnapshot(ops.io(),
           snapshotId(), parentSnapshotId, System.currentTimeMillis(), 
operation(), summary(base),
-          manifests);
+          base.currentSchemaId(), manifests);
     }
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 1e25922..c0f60d4 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -332,6 +332,8 @@ public class TableTestBase {
     }
 
     Assert.assertFalse("Should find all files in the manifest", 
newPaths.hasNext());
+
+    Assert.assertEquals("Schema ID should match", table.schema().schemaId(), 
(int) snap.schemaId());
   }
 
   void validateTableFiles(Table tbl, DataFile... expectedFiles) {
diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaID.java 
b/core/src/test/java/org/apache/iceberg/TestSchemaID.java
new file mode 100644
index 0000000..f6c4979
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaID.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestSchemaID extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] { 1, 2 };
+  }
+
+  public TestSchemaID(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Test
+  public void testNoChange() {
+    int onlyId = table.schema().schemaId();
+    Map<Integer, Schema> onlySchemaMap = schemaMap(table.schema());
+
+    // add files to table
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    TestHelpers.assertSameSchemaMap(onlySchemaMap, table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        table.schema().schemaId(), (int) table.currentSnapshot().schemaId());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(onlyId),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+
+    // remove file from table
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    TestHelpers.assertSameSchemaMap(onlySchemaMap, table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        table.schema().schemaId(), (int) table.currentSnapshot().schemaId());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(onlyId, onlyId),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+
+    // add file to table
+    table.newFastAppend().appendFile(FILE_A2).commit();
+
+    TestHelpers.assertSameSchemaMap(onlySchemaMap, table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        table.schema().schemaId(), (int) table.currentSnapshot().schemaId());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(onlyId, onlyId, onlyId),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+  }
+
+  @Test
+  public void testSchemaIdChangeInSchemaUpdate() {
+    Schema originalSchema = table.schema();
+
+    // add files to table
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    TestHelpers.assertSameSchemaMap(schemaMap(table.schema()), 
table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        table.schema().schemaId(), (int) table.currentSnapshot().schemaId());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(originalSchema.schemaId()),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+
+    // update schema
+    table.updateSchema().addColumn("data2", Types.StringType.get()).commit();
+
+    Schema updatedSchema = new Schema(1,
+        required(1, "id", Types.IntegerType.get()),
+        required(2, "data", Types.StringType.get()),
+        optional(3, "data2", Types.StringType.get())
+    );
+
+    TestHelpers.assertSameSchemaMap(schemaMap(originalSchema, updatedSchema), 
table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be old since 
update schema doesn't create new snapshot",
+        originalSchema.schemaId(), (int) table.currentSnapshot().schemaId());
+    Assert.assertEquals("Current schema should match", 
updatedSchema.asStruct(), table.schema().asStruct());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(originalSchema.schemaId()),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+
+    // remove file from table
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    TestHelpers.assertSameSchemaMap(schemaMap(originalSchema, updatedSchema), 
table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        updatedSchema.schemaId(), (int) table.currentSnapshot().schemaId());
+    Assert.assertEquals("Current schema should match", 
updatedSchema.asStruct(), table.schema().asStruct());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(originalSchema.schemaId(), updatedSchema.schemaId()),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+
+    // add files to table
+    table.newAppend().appendFile(FILE_A2).commit();
+
+    TestHelpers.assertSameSchemaMap(schemaMap(originalSchema, updatedSchema), 
table.schemas());
+    Assert.assertEquals("Current snapshot's schemaId should be the current",
+        updatedSchema.schemaId(), (int) table.currentSnapshot().schemaId());
+    Assert.assertEquals("Current schema should match", 
updatedSchema.asStruct(), table.schema().asStruct());
+
+    Assert.assertEquals("Schema ids should be correct in snapshots",
+        ImmutableList.of(originalSchema.schemaId(), updatedSchema.schemaId(), 
updatedSchema.schemaId()),
+        Lists.transform(Lists.newArrayList(table.snapshots()), 
Snapshot::schemaId));
+  }
+
+  private Map<Integer, Schema> schemaMap(Schema... schemas) {
+    return Arrays.stream(schemas).collect(Collectors.toMap(Schema::schemaId, 
Function.identity()));
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 664cdc5..22f0b0c 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -39,7 +39,7 @@ public class TestSnapshotJson {
 
   @Test
   public void testJsonConversion() {
-    Snapshot expected = new BaseSnapshot(ops.io(), System.currentTimeMillis(),
+    Snapshot expected = new BaseSnapshot(ops.io(), System.currentTimeMillis(), 
1,
         "file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro");
     String json = SnapshotParser.toJson(expected);
     Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
@@ -50,6 +50,23 @@ public class TestSnapshotJson {
         expected.allManifests(), snapshot.allManifests());
     Assert.assertNull("Operation should be null", snapshot.operation());
     Assert.assertNull("Summary should be null", snapshot.summary());
+    Assert.assertEquals("Schema ID should match", Integer.valueOf(1), 
snapshot.schemaId());
+  }
+
+  @Test
+  public void testJsonConversionWithoutSchemaId() {
+    Snapshot expected = new BaseSnapshot(ops.io(), System.currentTimeMillis(), 
null,
+        "file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro");
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+
+    Assert.assertEquals("Snapshot ID should match",
+        expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Files should match",
+        expected.allManifests(), snapshot.allManifests());
+    Assert.assertNull("Operation should be null", snapshot.operation());
+    Assert.assertNull("Summary should be null", snapshot.summary());
+    Assert.assertNull("Schema ID should be null", snapshot.schemaId());
   }
 
   @Test
@@ -62,7 +79,7 @@ public class TestSnapshotJson {
 
     Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, 
System.currentTimeMillis(),
         DataOperations.REPLACE, ImmutableMap.of("files-added", "4", 
"files-deleted", "100"),
-        manifests);
+        3, manifests);
 
     String json = SnapshotParser.toJson(expected);
     Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
@@ -83,6 +100,8 @@ public class TestSnapshotJson {
         expected.operation(), snapshot.operation());
     Assert.assertEquals("Summary should match",
         expected.summary(), snapshot.summary());
+    Assert.assertEquals("Schema ID should match",
+        expected.schemaId(), snapshot.schemaId());
   }
 
   @Test
@@ -102,9 +121,10 @@ public class TestSnapshotJson {
     }
 
     Snapshot expected = new BaseSnapshot(
-        ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, 
localInput(manifestList).location());
+        ops.io(), id, 34, parentId, System.currentTimeMillis(),
+        null, null, 4, localInput(manifestList).location());
     Snapshot inMemory = new BaseSnapshot(
-        ops.io(), id, parentId, expected.timestampMillis(), null, null, 
manifests);
+        ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, 
manifests);
 
     Assert.assertEquals("Files should match in memory list",
         inMemory.allManifests(), expected.allManifests());
@@ -126,5 +146,6 @@ public class TestSnapshotJson {
         expected.allManifests(), snapshot.allManifests());
     Assert.assertNull("Operation should be null", snapshot.operation());
     Assert.assertNull("Summary should be null", snapshot.summary());
+    Assert.assertEquals("Schema ID should match", expected.schemaId(), 
snapshot.schemaId());
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java 
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index c44ca63..d3cebec 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -89,11 +89,11 @@ public class TestTableMetadata {
   public void testJsonConversion() throws Exception {
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, 7, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
 
     List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
@@ -108,7 +108,6 @@ public class TestTableMetadata {
         SEQ_NO, System.currentTimeMillis(), 3,
         7, ImmutableList.of(TEST_SCHEMA, schema),
         5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
-
         3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", 
"value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, 
ImmutableList.of());
 
@@ -153,11 +152,15 @@ public class TestTableMetadata {
         (Long) previousSnapshotId, metadata.currentSnapshot().parentId());
     Assert.assertEquals("Current snapshot files should match",
         currentSnapshot.allManifests(), 
metadata.currentSnapshot().allManifests());
+    Assert.assertEquals("Schema ID for current snapshot should match",
+        (Integer) 7, metadata.currentSnapshot().schemaId());
     Assert.assertEquals("Previous snapshot ID should match",
         previousSnapshotId, 
metadata.snapshot(previousSnapshotId).snapshotId());
     Assert.assertEquals("Previous snapshot files should match",
         previousSnapshot.allManifests(),
         metadata.snapshot(previousSnapshotId).allManifests());
+    Assert.assertNull("Previous snapshot's schema ID should be null",
+        metadata.snapshot(previousSnapshotId).schemaId());
   }
 
   @Test
@@ -168,11 +171,11 @@ public class TestTableMetadata {
 
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
spec.specId())));
 
     TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
@@ -222,6 +225,8 @@ public class TestTableMetadata {
         (Long) previousSnapshotId, metadata.currentSnapshot().parentId());
     Assert.assertEquals("Current snapshot files should match",
         currentSnapshot.allManifests(), 
metadata.currentSnapshot().allManifests());
+    Assert.assertNull("Current snapshot's schema ID should be null",
+        metadata.currentSnapshot().schemaId());
     Assert.assertEquals("Previous snapshot ID should match",
         previousSnapshotId, 
metadata.snapshot(previousSnapshotId).snapshotId());
     Assert.assertEquals("Previous snapshot files should match",
@@ -229,6 +234,8 @@ public class TestTableMetadata {
         metadata.snapshot(previousSnapshotId).allManifests());
     Assert.assertEquals("Snapshot logs should match",
             expected.previousFiles(), metadata.previousFiles());
+    Assert.assertNull("Previous snapshot's schema ID should be null",
+        metadata.snapshot(previousSnapshotId).schemaId());
   }
 
   private static String toJsonWithoutSpecAndSchemaList(TableMetadata metadata) 
{
@@ -245,7 +252,7 @@ public class TestTableMetadata {
 
       // mimic an old writer by writing only schema and not the current ID or 
schema list
       generator.writeFieldName(SCHEMA);
-      SchemaParser.toJson(metadata.schema(), generator);
+      SchemaParser.toJson(metadata.schema().asStruct(), generator);
 
       // mimic an old writer by writing only partition-spec and not the 
default ID or spec list
       generator.writeFieldName(PARTITION_SPEC);
@@ -280,11 +287,11 @@ public class TestTableMetadata {
   public void testJsonWithPreviousMetadataLog() throws Exception {
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -311,11 +318,11 @@ public class TestTableMetadata {
   public void testAddPreviousMetadataRemoveNone() {
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -351,11 +358,11 @@ public class TestTableMetadata {
   public void testAddPreviousMetadataRemoveOne() {
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -403,11 +410,11 @@ public class TestTableMetadata {
   public void testAddPreviousMetadataRemoveMultiple() {
     long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
+        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
+        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -707,7 +714,7 @@ public class TestTableMetadata {
         Types.NestedField.required(2, "x", Types.StringType.get())
     );
     TableMetadata sameSchemaTable = twoSchemasTable.updateSchema(sameSchema2, 
2);
-    Assert.assertEquals("Should return same table metadata",
+    Assert.assertSame("Should return same table metadata",
         twoSchemasTable, sameSchemaTable);
 
     // update schema with the the same schema and different last column ID as 
current should create a new table
@@ -738,7 +745,7 @@ public class TestTableMetadata {
         Types.NestedField.required(4, "x", Types.StringType.get()),
         Types.NestedField.required(6, "z", Types.IntegerType.get())
     );
-    TableMetadata threeSchemaTable = revertSchemaTable.updateSchema(schema3, 
3);
+    TableMetadata threeSchemaTable = revertSchemaTable.updateSchema(schema3, 
6);
     Assert.assertEquals("Should have current schema id as 2",
         2, threeSchemaTable.currentSchemaId());
     assertSameSchemaList(ImmutableList.of(schema,
@@ -747,6 +754,6 @@ public class TestTableMetadata {
     Assert.assertEquals("Should have expected schema upon return",
         schema3.asStruct(), threeSchemaTable.schema().asStruct());
     Assert.assertEquals("Should return expected last column id",
-        3, threeSchemaTable.lastColumnId());
+        6, threeSchemaTable.lastColumnId());
   }
 }
diff --git a/core/src/test/resources/TableMetadataV2Valid.json 
b/core/src/test/resources/TableMetadataV2Valid.json
index d43e0a2..0dc89de 100644
--- a/core/src/test/resources/TableMetadataV2Valid.json
+++ b/core/src/test/resources/TableMetadataV2Valid.json
@@ -85,8 +85,38 @@
     }
   ],
   "properties": {},
-  "current-snapshot-id": -1,
-  "snapshots": [],
-  "snapshot-log": [],
+  "current-snapshot-id": 3055729675574597004,
+  "snapshots": [
+    {
+      "snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1515100955770,
+      "sequence-number": 0,
+      "summary": {
+        "operation": "append"
+      },
+      "manifest-list": "s3://a/b/1.avro"
+    },
+    {
+      "snapshot-id": 3055729675574597004,
+      "parent-snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1555100955770,
+      "sequence-number": 1,
+      "summary": {
+        "operation": "append"
+      },
+      "manifest-list": "s3://a/b/2.avro",
+      "schema-id": 1
+    }
+  ],
+  "snapshot-log": [
+    {
+      "snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1515100955770
+    },
+    {
+      "snapshot-id": 3055729675574597004,
+      "timestamp-ms": 1555100955770
+    }
+  ],
   "metadata-log": []
-}
+}
\ No newline at end of file

Reply via email to