This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 549da80  Store split offsets for Parquet files (#186)
549da80 is described below

commit 549da809490976b53a13b14596dd240ed74bce5e
Author: Samarth Jain <[email protected]>
AuthorDate: Thu May 16 14:39:07 2019 -0700

    Store split offsets for Parquet files (#186)
---
 api/src/main/java/org/apache/iceberg/DataFile.java | 11 +++-
 .../java/org/apache/iceberg/io/FileAppender.java   | 10 ++++
 .../test/java/org/apache/iceberg/TestHelpers.java  |  5 ++
 .../main/java/org/apache/iceberg/DataFiles.java    | 64 ++++++----------------
 .../java/org/apache/iceberg/GenericDataFile.java   | 41 ++++++--------
 .../{ParquetMetrics.java => ParquetUtil.java}      | 22 ++++++--
 .../iceberg/parquet/ParquetWriteAdapter.java       |  8 ++-
 .../org/apache/iceberg/parquet/ParquetWriter.java  |  9 ++-
 ...estParquetMetrics.java => TestParquetUtil.java} | 13 ++---
 .../org/apache/iceberg/spark/source/Writer.java    |  7 ++-
 .../org/apache/iceberg/spark/SparkTableUtil.scala  |  6 +-
 .../iceberg/spark/source/TestParquetScan.java      |  4 +-
 .../iceberg/spark/source/TestParquetWrite.java     | 16 +++++-
 13 files changed, 119 insertions(+), 97 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java 
b/api/src/main/java/org/apache/iceberg/DataFile.java
index b8c02be..d4c1b67 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -58,8 +58,9 @@ public interface DataFile {
             IntegerType.get(), BinaryType.get())),
         optional(128, "upper_bounds", MapType.ofRequired(129, 130,
             IntegerType.get(), BinaryType.get())),
-        optional(131, "key_metadata", BinaryType.get())
-        // NEXT ID TO ASSIGN: 132
+        optional(131, "key_metadata", BinaryType.get()),
+        optional(132, "split_offsets", ListType.ofRequired(133, 
LongType.get()))
+        // NEXT ID TO ASSIGN: 134
     );
   }
 
@@ -136,4 +137,10 @@ public interface DataFile {
    * @return a copy of this data file
    */
   DataFile copy();
+
+  /**
+   * @return a list of offsets for file blocks if applicable, null otherwise. 
When available, this
+   * information is used for planning scan tasks whose boundaries are 
determined by these offsets.
+   */
+  List<Long> splitOffsets();
 }
diff --git a/api/src/main/java/org/apache/iceberg/io/FileAppender.java 
b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
index b98859a..74229cc 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.io;
 
 import java.io.Closeable;
 import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.Metrics;
 
 public interface FileAppender<D> extends Closeable {
@@ -45,4 +46,13 @@ public interface FileAppender<D> extends Closeable {
    * @return the length of this file. Only valid after the file is closed.
    */
   long length();
+
+  /**
+   * @return a list of offsets for file blocks if applicable, null otherwise. 
When available, this
+   * information is used for planning scan tasks whose boundaries are 
determined by these offsets.
+   * Only valid after the file is closed.
+   */
+  default List<Long> splitOffsets() {
+    return null;
+  }
 }
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java 
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index ab92634..d92f789 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -376,5 +376,10 @@ public class TestHelpers {
     public DataFile copy() {
       return this;
     }
+
+    @Override
+    public List<Long> splitOffsets() {
+      return null;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java 
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 9a3111a..1819b64 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Locale;
@@ -106,19 +107,12 @@ public class DataFiles {
         location, format, partition, rowCount, stat.getLen());
   }
 
-  public static DataFile fromStat(FileStatus stat, PartitionData partition, 
Metrics metrics) {
-    String location = stat.getPath().toString();
-    FileFormat format = FileFormat.fromFileName(location);
-    return new GenericDataFile(
-        location, format, partition, stat.getLen(), metrics);
-  }
-
   public static DataFile fromStat(FileStatus stat, PartitionData partition, 
Metrics metrics,
-      EncryptionKeyMetadata keyMetadata) {
+      EncryptionKeyMetadata keyMetadata, List<Long> splitOffsets) {
     String location = stat.getPath().toString();
     FileFormat format = FileFormat.fromFileName(location);
     return new GenericDataFile(
-        location, format, partition, stat.getLen(), metrics, 
keyMetadata.buffer());
+        location, format, partition, stat.getLen(), metrics, 
keyMetadata.buffer(), splitOffsets);
   }
 
   public static DataFile fromInputFile(InputFile file, PartitionData 
partition, long rowCount) {
@@ -132,17 +126,6 @@ public class DataFiles {
         location, format, partition, rowCount, file.getLength());
   }
 
-  public static DataFile fromInputFile(InputFile file, PartitionData 
partition, Metrics metrics) {
-    if (file instanceof HadoopInputFile) {
-      return fromStat(((HadoopInputFile) file).getStat(), partition, metrics);
-    }
-
-    String location = file.location();
-    FileFormat format = FileFormat.fromFileName(location);
-    return new GenericDataFile(
-        location, format, partition, file.getLength(), metrics);
-  }
-
   public static DataFile fromInputFile(InputFile file, long rowCount) {
     if (file instanceof HadoopInputFile) {
       return fromStat(((HadoopInputFile) file).getStat(), rowCount);
@@ -154,37 +137,17 @@ public class DataFiles {
   }
 
   public static DataFile fromEncryptedOutputFile(EncryptedOutputFile 
encryptedFile, PartitionData partition,
-                                                Metrics metrics) {
+                                                Metrics metrics, List<Long> 
splitOffsets) {
     EncryptionKeyMetadata keyMetadata = encryptedFile.keyMetadata();
     InputFile file = encryptedFile.encryptingOutputFile().toInputFile();
     if (encryptedFile instanceof HadoopInputFile) {
-      return fromStat(((HadoopInputFile) file).getStat(), partition, metrics, 
keyMetadata);
+      return fromStat(((HadoopInputFile) file).getStat(), partition, metrics, 
keyMetadata, splitOffsets);
     }
 
     String location = file.location();
     FileFormat format = FileFormat.fromFileName(location);
     return new GenericDataFile(
-        location, format, partition, file.getLength(), metrics, 
keyMetadata.buffer());
-  }
-
-  public static DataFile fromParquetInputFile(InputFile file,
-                                              PartitionData partition,
-                                              Metrics metrics) {
-    if (file instanceof HadoopInputFile) {
-      return fromParquetStat(((HadoopInputFile) file).getStat(), partition, 
metrics);
-    }
-
-    String location = file.location();
-    FileFormat format = FileFormat.PARQUET;
-    return new GenericDataFile(
-        location, format, partition, file.getLength(), metrics);
-  }
-
-  public static DataFile fromParquetStat(FileStatus stat, PartitionData 
partition, Metrics metrics) {
-    String location = stat.getPath().toString();
-    FileFormat format = FileFormat.PARQUET;
-    return new GenericDataFile(
-        location, format, partition, stat.getLen(), metrics);
+        location, format, partition, file.getLength(), metrics, 
keyMetadata.buffer(), splitOffsets);
   }
 
   public static Builder builder(PartitionSpec spec) {
@@ -211,6 +174,7 @@ public class DataFiles {
     private Map<Integer, ByteBuffer> lowerBounds = null;
     private Map<Integer, ByteBuffer> upperBounds = null;
     private ByteBuffer keyMetadata = null;
+    private List<Long> splitOffsets = null;
 
     public Builder() {
       this.spec = null;
@@ -237,6 +201,7 @@ public class DataFiles {
       this.nullValueCounts = null;
       this.lowerBounds = null;
       this.upperBounds = null;
+      this.splitOffsets = null;
     }
 
     public Builder copy(DataFile toCopy) {
@@ -254,6 +219,7 @@ public class DataFiles {
       this.upperBounds = toCopy.upperBounds();
       this.keyMetadata = toCopy.keyMetadata() == null ? null
           : ByteBuffers.copy(toCopy.keyMetadata());
+      this.splitOffsets = toCopy.splitOffsets() == null ? null : 
ImmutableList.copyOf(toCopy.splitOffsets());
       return this;
     }
 
@@ -327,6 +293,11 @@ public class DataFiles {
       return this;
     }
 
+    public Builder withSplitOffsets(List<Long> offsets) {
+      this.splitOffsets = offsets == null ? null : 
ImmutableList.copyOf(offsets);
+      return this;
+    }
+
     public Builder withEncryptionKeyMetadata(ByteBuffer newKeyMetadata) {
       this.keyMetadata = newKeyMetadata;
       return this;
@@ -336,10 +307,6 @@ public class DataFiles {
       return withEncryptionKeyMetadata(newKeyMetadata.buffer());
     }
 
-    public Builder withEncryptionKeyMetadata(byte[] newKeyMetadata) {
-      return withEncryptionKeyMetadata(ByteBuffer.wrap(newKeyMetadata));
-    }
-
     public DataFile build() {
       Preconditions.checkArgument(filePath != null, "File path is required");
       if (format == null) {
@@ -352,7 +319,8 @@ public class DataFiles {
       return new GenericDataFile(
           filePath, format, isPartitioned ? partitionData.copy() : null,
           fileSizeInBytes, new Metrics(
-              recordCount, columnSizes, valueCounts, nullValueCounts, 
lowerBounds, upperBounds), keyMetadata);
+              recordCount, columnSizes, valueCounts, nullValueCounts, 
lowerBounds, upperBounds),
+          keyMetadata, splitOffsets);
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java 
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index e38217d..7544808 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -63,6 +63,7 @@ class GenericDataFile
   private Map<Integer, Long> nullValueCounts = null;
   private Map<Integer, ByteBuffer> lowerBounds = null;
   private Map<Integer, ByteBuffer> upperBounds = null;
+  private List<Long> splitOffsets = null;
   private transient ByteBuffer keyMetadata = null;
 
   // cached schema
@@ -115,14 +116,6 @@ class GenericDataFile
     this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
     this.recordCount = recordCount;
     this.fileSizeInBytes = fileSizeInBytes;
-    this.fileOrdinal = null;
-    this.sortColumns = null;
-    this.columnSizes = null;
-    this.valueCounts = null;
-    this.nullValueCounts = null;
-    this.lowerBounds = null;
-    this.upperBounds = null;
-    this.fromProjectionPos = null;
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
@@ -133,18 +126,10 @@ class GenericDataFile
     this.partitionType = partition.getPartitionType();
     this.recordCount = recordCount;
     this.fileSizeInBytes = fileSizeInBytes;
-    this.fileOrdinal = null;
-    this.sortColumns = null;
-    this.columnSizes = null;
-    this.valueCounts = null;
-    this.nullValueCounts = null;
-    this.lowerBounds = null;
-    this.upperBounds = null;
-    this.fromProjectionPos = null;
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
-                  long fileSizeInBytes, Metrics metrics) {
+                  long fileSizeInBytes, Metrics metrics, List<Long> 
splitOffsets) {
     this.filePath = filePath;
     this.format = format;
 
@@ -160,20 +145,18 @@ class GenericDataFile
     // this will throw NPE if metrics.recordCount is null
     this.recordCount = metrics.recordCount();
     this.fileSizeInBytes = fileSizeInBytes;
-    this.fileOrdinal = null;
-    this.sortColumns = null;
     this.columnSizes = metrics.columnSizes();
     this.valueCounts = metrics.valueCounts();
     this.nullValueCounts = metrics.nullValueCounts();
     this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
     this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
-    this.fromProjectionPos = null;
+    this.splitOffsets = copy(splitOffsets);
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
                   long fileSizeInBytes, Metrics metrics,
-                  ByteBuffer keyMetadata) {
-    this(filePath, format, partition, fileSizeInBytes, metrics);
+                  ByteBuffer keyMetadata, List<Long> splitOffsets) {
+    this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
     this.keyMetadata = keyMetadata;
   }
 
@@ -199,6 +182,7 @@ class GenericDataFile
     this.upperBounds = 
SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
     this.fromProjectionPos = toCopy.fromProjectionPos;
     this.keyMetadata = toCopy.keyMetadata == null ? null : 
ByteBuffers.copy(toCopy.keyMetadata);
+    this.splitOffsets = copy(toCopy.splitOffsets);
   }
 
   /**
@@ -273,6 +257,11 @@ class GenericDataFile
   }
 
   @Override
+  public List<Long> splitOffsets() {
+    return splitOffsets;
+  }
+
+  @Override
   public org.apache.avro.Schema getSchema() {
     if (avroSchema == null) {
       this.avroSchema = getAvroSchema(partitionType);
@@ -331,6 +320,9 @@ class GenericDataFile
       case 13:
         this.keyMetadata = (ByteBuffer) v;
         return;
+      case 14:
+        this.splitOffsets = (List<Long>) v;
+        return;
       default:
         // ignore the object, it must be from a newer version of the format
     }
@@ -379,6 +371,8 @@ class GenericDataFile
         return upperBounds;
       case 13:
         return keyMetadata;
+      case 14:
+        return splitOffsets;
       default:
         throw new UnsupportedOperationException("Unknown field ordinal: " + 
pos);
     }
@@ -398,7 +392,7 @@ class GenericDataFile
 
   @Override
   public int size() {
-    return 14;
+    return 15;
   }
 
   @Override
@@ -415,6 +409,7 @@ class GenericDataFile
         .add("lower_bounds", lowerBounds)
         .add("upper_bounds", upperBounds)
         .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
+        .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
         .toString();
   }
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
similarity index 90%
rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 678d24f..ecf9c8d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -19,11 +19,12 @@
 
 package org.apache.iceberg.parquet;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,19 +47,20 @@ import org.apache.parquet.schema.MessageType;
 
 import static 
org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive;
 
-public class ParquetMetrics implements Serializable {
-  private ParquetMetrics() {
+public class ParquetUtil {
+  // not meant to be instantiated
+  private ParquetUtil() {
   }
 
-  public static Metrics fromInputFile(InputFile file) {
+  public static Metrics fileMetrics(InputFile file) {
     try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(file))) {
-      return fromMetadata(reader.getFooter());
+      return footerMetrics(reader.getFooter());
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to read footer of file: %s", 
file);
     }
   }
 
-  public static Metrics fromMetadata(ParquetMetadata metadata) {
+  public static Metrics footerMetrics(ParquetMetadata metadata) {
     long rowCount = 0;
     Map<Integer, Long> columnSizes = Maps.newHashMap();
     Map<Integer, Long> valueCounts = Maps.newHashMap();
@@ -107,6 +109,14 @@ public class ParquetMetrics implements Serializable {
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, 
upperBounds));
   }
 
+  public static List<Long> getSplitOffsets(ParquetMetadata md) {
+    List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
+    for (BlockMetaData blockMetaData : md.getBlocks()) {
+      splitOffsets.add(blockMetaData.getStartingPos());
+    }
+    return ImmutableList.copyOf(splitOffsets);
+  }
+
   // we allow struct nesting, but not maps or arrays
   private static boolean shouldStoreBounds(ColumnPath columnPath, Schema 
schema) {
     Iterator<String> pathIterator = columnPath.iterator();
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
index 7d31e0c..292eb89 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.List;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
@@ -47,7 +48,7 @@ public class ParquetWriteAdapter<D> implements 
FileAppender<D> {
   @Override
   public Metrics metrics() {
     Preconditions.checkState(footer != null, "Cannot produce metrics until 
closed");
-    return ParquetMetrics.fromMetadata(footer);
+    return ParquetUtil.footerMetrics(footer);
   }
 
   @Override
@@ -58,6 +59,11 @@ public class ParquetWriteAdapter<D> implements 
FileAppender<D> {
   }
 
   @Override
+  public List<Long> splitOffsets() {
+    return ParquetUtil.getSplitOffsets(writer.getFooter());
+  }
+
+  @Override
   public void close() throws IOException {
     if (writer != null) {
       writer.close();
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 8ad126e..c7bd6e2 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.parquet;
 import com.google.common.collect.ImmutableMap;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
@@ -44,7 +45,6 @@ import org.apache.parquet.schema.MessageType;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
 import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
-import static 
org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
 
 class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtor = 
DynConstructors
@@ -115,7 +115,7 @@ class ParquetWriter<T> implements FileAppender<T>, 
Closeable {
 
   @Override
   public Metrics metrics() {
-    return ParquetMetrics.fromMetadata(writer.getFooter());
+    return ParquetUtil.footerMetrics(writer.getFooter());
   }
 
   @Override
@@ -127,6 +127,11 @@ class ParquetWriter<T> implements FileAppender<T>, 
Closeable {
     }
   }
 
+  @Override
+  public List<Long> splitOffsets() {
+    return ParquetUtil.getSplitOffsets(writer.getFooter());
+  }
+
   private void checkSize() {
     if (recordCount >= nextCheckRecordCount) {
       long bufferedSize = writeStore.getBufferedSize();
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
similarity index 96%
rename from 
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
rename to parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
index ec03850..1b681d3 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
@@ -60,8 +60,7 @@ import static 
org.apache.iceberg.types.Conversions.fromByteBuffer;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
-public class TestParquetMetrics extends BaseParquetWritingTest {
-
+public class TestParquetUtil extends BaseParquetWritingTest {
   private final UUID uuid = UUID.randomUUID();
   private final GenericFixed fixed = new GenericData.Fixed(
       org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -116,7 +115,7 @@ public class TestParquetMetrics extends 
BaseParquetWritingTest {
 
     File parquetFile = writeRecords(schema, firstRecord, secondRecord);
 
-    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
     Assert.assertEquals(2L, (long) metrics.recordCount());
     assertCounts(1, 2L, 0L, metrics);
     assertBounds(1, BooleanType.get(), false, true, metrics);
@@ -163,7 +162,7 @@ public class TestParquetMetrics extends 
BaseParquetWritingTest {
 
     File parquetFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1L, 0L, metrics);
     assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new 
BigDecimal("2.55"), metrics);
@@ -200,7 +199,7 @@ public class TestParquetMetrics extends 
BaseParquetWritingTest {
 
     File parquetFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1L, 0L, metrics);
     assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, 
metrics);
@@ -235,7 +234,7 @@ public class TestParquetMetrics extends 
BaseParquetWritingTest {
 
     File parquetFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1, 0, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
@@ -259,7 +258,7 @@ public class TestParquetMetrics extends 
BaseParquetWritingTest {
 
     File parquetFile = writeRecords(schema, firstRecord, secondRecord);
 
-    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
     Assert.assertEquals(2L, (long) metrics.recordCount());
     assertCounts(1, 2, 2, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 4124136..c202671 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -244,6 +244,7 @@ class Writer implements DataSourceWriter {
     private final FileIO fileIo;
     private FileAppender<InternalRow> appender = null;
     private Metrics metrics = null;
+    private List<Long> offsetRanges = null;
     private final EncryptedOutputFile file;
 
     UnpartitionedWriter(
@@ -265,6 +266,7 @@ class Writer implements DataSourceWriter {
     public WriterCommitMessage commit() throws IOException {
       Preconditions.checkArgument(appender != null, "Commit called on a closed 
writer: %s", this);
 
+      // metrics and splitOffsets are populated on close
       close();
 
       if (metrics.recordCount() == 0L) {
@@ -272,7 +274,7 @@ class Writer implements DataSourceWriter {
         return new TaskCommit();
       }
 
-      DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, 
metrics);
+      DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, 
metrics, offsetRanges);
 
       return new TaskCommit(dataFile);
     }
@@ -290,6 +292,7 @@ class Writer implements DataSourceWriter {
       if (this.appender != null) {
         this.appender.close();
         this.metrics = appender.metrics();
+        this.offsetRanges = appender.splitOffsets();
         this.appender = null;
       }
     }
@@ -371,12 +374,14 @@ class Writer implements DataSourceWriter {
         currentAppender.close();
         // metrics are only valid after the appender is closed
         Metrics metrics = currentAppender.metrics();
+        List<Long> splitOffsets = currentAppender.splitOffsets();
         this.currentAppender = null;
 
         DataFile dataFile = DataFiles.builder(spec)
             .withEncryptedOutputFile(currentFile)
             .withPartition(currentKey)
             .withMetrics(metrics)
+            .withSplitOffsets(splitOffsets)
             .build();
 
         completedPartitions.add(currentKey);
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala 
b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index ca835d6..86dfbe3 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -21,17 +21,15 @@ package org.apache.iceberg.spark
 
 import java.nio.ByteBuffer
 import java.util
-
 import com.google.common.collect.Maps
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.iceberg.parquet.ParquetUtil
 import org.apache.iceberg.{DataFile, DataFiles, Metrics, PartitionSpec}
-import org.apache.iceberg.parquet.ParquetMetrics
 import org.apache.iceberg.spark.hacks.Hive
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
-
 import scala.collection.JavaConverters._
 
 object SparkTableUtil {
@@ -236,7 +234,7 @@ object SparkTableUtil {
     val fs = partition.getFileSystem(conf)
 
     fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
-      val metrics = 
ParquetMetrics.fromMetadata(ParquetFileReader.readFooter(conf, stat))
+      val metrics = 
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat))
 
       SparkDataFile(
         stat.getPath.toString,
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
index 0475f40..ced85d4 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
@@ -51,7 +51,7 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.Files.localOutput;
-import static org.apache.iceberg.parquet.ParquetMetrics.fromInputFile;
+import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics;
 
 public class TestParquetScan extends AvroDataTest {
   private static final Configuration CONF = new Configuration();
@@ -106,7 +106,7 @@ public class TestParquetScan extends AvroDataTest {
     DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
         .withFileSizeInBytes(parquetFile.length())
         .withPath(parquetFile.toString())
-        .withMetrics(fromInputFile(localInput(parquetFile)))
+        .withMetrics(fileMetrics(localInput(parquetFile)))
         .build();
 
     table.newAppend().appendFile(file).commit();
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index 59504b1..59d8e7f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -24,6 +24,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -40,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
 public class TestParquetWrite {
@@ -96,8 +100,18 @@ public class TestParquetWrite {
         .load(location.toString());
 
     List<SimpleRecord> actual = 
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
-
     Assert.assertEquals("Number of rows should match", expected.size(), 
actual.size());
     Assert.assertEquals("Result rows should match", expected, actual);
+    for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+      for (DataFile file : ManifestReader.read(localInput(manifest.path()), 
null)) {
+        Assert.assertNotNull("Split offsets not present", file.splitOffsets());
+        Assert.assertEquals("Should have reported record count as 1" , 1, 
file.recordCount());
+        Assert.assertNotNull("Column sizes metric not present", 
file.columnSizes());
+        Assert.assertNotNull("Counts metric not present", file.valueCounts());
+        Assert.assertNotNull("Null value counts metric not present", 
file.nullValueCounts());
+        Assert.assertNotNull("Lower bounds metric not present", 
file.lowerBounds());
+        Assert.assertNotNull("Upper bounds metric not present", 
file.upperBounds());
+      }
+    }
   }
 }

Reply via email to