This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fa54b3  feat(manifests): Adding implementation of manifest files (#3)
8fa54b3 is described below

commit 8fa54b32e3b191833305164a44456cf43c16cabd
Author: Matt Topol <[email protected]>
AuthorDate: Sat Sep 23 03:09:37 2023 -0400

    feat(manifests): Adding implementation of manifest files (#3)
---
 go.mod                   |  35 ++-
 go.sum                   | 117 ++++++-
 internal/avro_schemas.go | 568 +++++++++++++++++++++++++++++++++
 internal/mock_fs.go      |  85 +++++
 io/io.go                 | 248 +++++++++++++++
 go.mod => io/local.go    |  25 +-
 io/s3.go                 | 108 +++++++
 manifest.go              | 803 +++++++++++++++++++++++++++++++++++++++++++++++
 manifest_test.go         | 773 +++++++++++++++++++++++++++++++++++++++++++++
 9 files changed, 2731 insertions(+), 31 deletions(-)

diff --git a/go.mod b/go.mod
index 6bea61d..5e1aea8 100644
--- a/go.mod
+++ b/go.mod
@@ -20,14 +20,43 @@ module github.com/apache/iceberg-go
 go 1.20
 
 require (
+       github.com/aws/aws-sdk-go-v2 v1.20.3
+       github.com/aws/aws-sdk-go-v2/config v1.18.35
+       github.com/aws/aws-sdk-go-v2/credentials v1.13.34
+       github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4
+       github.com/hamba/avro/v2 v2.14.1
        github.com/stretchr/testify v1.8.4
-       golang.org/x/exp v0.0.0-20230905200255-921286631fa9
+       github.com/wolfeidau/s3iofs v1.2.0
+       golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
 )
 
 require (
+       github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // 
indirect
+       github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10 // indirect
+       github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 // indirect
+       github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34 // indirect
+       github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41 // indirect
+       github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3 // indirect
+       github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 
// indirect
+       github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35 // 
indirect
+       github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34 // 
indirect
+       github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3 // 
indirect
+       github.com/aws/aws-sdk-go-v2/service/sso v1.13.4 // indirect
+       github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4 // indirect
+       github.com/aws/aws-sdk-go-v2/service/sts v1.21.4 // indirect
+       github.com/aws/smithy-go v1.14.2 // indirect
        github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/kr/pretty v0.3.1 // indirect
+       github.com/golang/mock v1.6.0 // indirect
+       github.com/golang/snappy v0.0.4 // indirect
+       github.com/json-iterator/go v1.1.12 // indirect
+       github.com/mitchellh/mapstructure v1.5.0 // indirect
+       github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
+       github.com/modern-go/reflect2 v1.0.2 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
-       gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
+       github.com/stretchr/objx v0.5.0 // indirect
+       golang.org/x/mod v0.11.0 // indirect
+       golang.org/x/net v0.11.0 // indirect
+       golang.org/x/sys v0.9.0 // indirect
+       golang.org/x/tools v0.10.0 // indirect
        gopkg.in/yaml.v3 v3.0.1 // indirect
 )
diff --git a/go.sum b/go.sum
index 460287c..d12be7b 100644
--- a/go.sum
+++ b/go.sum
@@ -1,24 +1,111 @@
-github.com/creack/pty v1.1.9/go.mod 
h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/aws/aws-sdk-go-v2 v1.20.3 
h1:lgeKmAZhlj1JqN43bogrM75spIvYnRxqTAh1iupu1yE=
+github.com/aws/aws-sdk-go-v2 v1.20.3/go.mod 
h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 
h1:OPLEkmhXf6xFPiz0bLeDArZIDx1NNS4oJyG4nv3Gct0=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13/go.mod 
h1:gpAbvyDGQFozTEmlTFO8XcQKHzubdq0LzRyJpG6MiXM=
+github.com/aws/aws-sdk-go-v2/config v1.18.35 
h1:uU9rgCzrW/pVRUUlRULiwKQe8RoEDst1NQu4Qo8kOtk=
+github.com/aws/aws-sdk-go-v2/config v1.18.35/go.mod 
h1:7xF1yr9GBMfYRQI4PLHO8iceqKLM6DpGVEvXI38HB/A=
+github.com/aws/aws-sdk-go-v2/credentials v1.13.34 
h1:/EYG4lzayDd5PY6HQQ2Qyj/cD6CR3kz96BjTZAO5tNo=
+github.com/aws/aws-sdk-go-v2/credentials v1.13.34/go.mod 
h1:+wgdxCGNulHme6kTMZuDL9KOagLPloemoYkfjpQkSEU=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10 
h1:mgOrtwYfJZ4e3QJe1TrliC/xIkauafGMdLLuCExOqcs=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10/go.mod 
h1:wMsSLVM2hRpDVhd+3dtLUzqwm7/fjuhNN+b1aOLDt6g=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 
h1:CXceCS9BrDInRc74GDCQ8Qyk/Gp9VLdK+Rlve+zELSE=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40/go.mod 
h1:5kKmFhLeOVy6pwPDpDNA6/hK/d6URC98pqDDqHgdBx4=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34 
h1:B+nZtd22cbko5+793hg7LEaTeLMiZwlgCLUrN5Y0uzg=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34/go.mod 
h1:RZP0scceAyhMIQ9JvFp7HvkpcgqjL4l/4C+7RAeGbuM=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41 
h1:EcSFdpLdkF3FWizimox0qYLuorn9e4PNMR27mvshGLs=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41/go.mod 
h1:mKxUXW+TuwpCKKHVlmHGVVuBi9y9LKW8AiQodg23M5E=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3 
h1:uHhWcrNBgpm9gi3o8NSQcsAqha/U9OFYzi2k4+0UVz8=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3/go.mod 
h1:jYLMm3Dh0wbeV3lxth5ryks/O2M/omVXWyYm3YcEVqQ=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 
h1:m0QTSI6pZYJTk5WSKx3fm5cNW/DCicVzULBgU/6IyD0=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14/go.mod 
h1:dDilntgHy9WnHXsh7dDtUPgHKEfTJIBUTHM8OWm0f/0=
+github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35 
h1:oCUrlTzh9GwhlYdyDGNAS6UgqJRzJp5rKoYCJWqLyZI=
+github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35/go.mod 
h1:YVHrksq36j0sbXCT6rSuQafpfYkMYqy0QTk7JTCTBIU=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34 
h1:JwvXk+1ePAD9xkFHprhHYqwsxLDcbNFsPI1IAT2sPS0=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34/go.mod 
h1:ytsF+t+FApY2lFnN51fJKPhH6ICKOPXKEcwwgmJEdWI=
+github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3 
h1:rPDAISw3FjEhrJoaxmQjuD+GgBfv2p3AVhmAcnyqq3k=
+github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3/go.mod 
h1:TXBww3ANB+QRj+/dUoYDvI8d/u4F4WzTxD4mxtDoxrg=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4 
h1:P4p346B+YMTTCH9D4I/FWYl+E7BjSLQxqk1e2KYDI5w=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4/go.mod 
h1:uDxTlJiuPhbtRRPMHrPYRkn1Ck7Mtk3BEJiDut+gR5Y=
+github.com/aws/aws-sdk-go-v2/service/sso v1.13.4 
h1:WZPZ7Zf6Yo13lsfTetFrLU/7hZ9CXESDpdIHvmLxQFQ=
+github.com/aws/aws-sdk-go-v2/service/sso v1.13.4/go.mod 
h1:FP05hDXTLouXwAMQ1swqybHy7tHySblMkBMKSumaKg0=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4 
h1:pYFM2U/3/4RLrlMSYXwL1XPBCWvaePk2p+0+i/BgHOs=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4/go.mod 
h1:4pdlNASc29u0j9bq2jIQcBghG5Lx2oQAIj91vo1u1t8=
+github.com/aws/aws-sdk-go-v2/service/sts v1.21.4 
h1:zj4jxK3L54tGyqKleKDMK4vHolENxlq11dF0v1oBkJo=
+github.com/aws/aws-sdk-go-v2/service/sts v1.21.4/go.mod 
h1:CQRMCzYvl5eeAQW3AWkRLS+zGGXCucBnsiQlrs+tCeo=
+github.com/aws/smithy-go v1.14.2 
h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ=
+github.com/aws/smithy-go v1.14.2/go.mod 
h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/kr/pretty v0.2.1/go.mod 
h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
-github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
-github.com/kr/pretty v0.3.1/go.mod 
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
-github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
-github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
-github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
-github.com/kr/text v0.2.0/go.mod 
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
-github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod 
h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
+github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
+github.com/golang/mock v1.6.0/go.mod 
h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
+github.com/google/go-cmp v0.5.8/go.mod 
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/hamba/avro/v2 v2.14.1 
h1:mRkiRKjRTTs+yx0nVuM6z/q5zg3VBZfOe/01ngAnU6A=
+github.com/hamba/avro/v2 v2.14.1/go.mod 
h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
+github.com/jmespath/go-jmespath v0.4.0/go.mod 
h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod 
h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
+github.com/json-iterator/go v1.1.12 
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod 
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/mitchellh/mapstructure v1.5.0 
h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod 
h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd 
h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod 
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/rogpeppe/go-internal v1.9.0 
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
-github.com/rogpeppe/go-internal v1.9.0/go.mod 
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
+github.com/stretchr/objx v0.5.0/go.mod 
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.4 
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
 github.com/stretchr/testify v1.8.4/go.mod 
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
-golang.org/x/exp v0.0.0-20230905200255-921286631fa9 
h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
-golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod 
h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
+github.com/wolfeidau/s3iofs v1.2.0 
h1:kqUIBKZHBLPzE4nThAHJmKhM4/EuIil4bP5U4LvGPfY=
+github.com/wolfeidau/s3iofs v1.2.0/go.mod 
h1:y/7CrhZ5S9hBJ2psOEmvMhTj3u/sqVWYuGoIegHEP7I=
+github.com/yuin/goldmark v1.3.5/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb 
h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA=
+golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod 
h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
+golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU=
+golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
+golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.1/go.mod 
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
+golang.org/x/tools v0.10.0/go.mod 
h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c 
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
-gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod 
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/internal/avro_schemas.go b/internal/avro_schemas.go
new file mode 100644
index 0000000..4628787
--- /dev/null
+++ b/internal/avro_schemas.go
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "github.com/hamba/avro/v2"
+
+const (
+       ManifestListV1Key  = "manifest-list-v1"
+       ManifestListV2Key  = "manifest-list-v2"
+       ManifestEntryV1Key = "manifest-entry-v1"
+       ManifestEntryV2Key = "manifest-entry-v2"
+)
+
+var (
+       AvroSchemaCache avro.SchemaCache
+)
+
+func init() {
+       AvroSchemaCache.Add(ManifestListV1Key, avro.MustParse(`{
+               "type": "record",
+               "name": "manifest_file",
+               "fields": [
+                       {"name": "manifest_path", "type": "string", "doc": 
"Location URI with FS scheme", "field-id": 500},
+                       {"name": "manifest_length", "type": "long", "doc": 
"Total file size in bytes", "field-id": 501},
+                       {"name": "partition_spec_id", "type": "int", "doc": 
"Spec ID used to write", "field-id": 502},
+                       {
+                               "name": "added_snapshot_id",
+                               "type": ["null", "long"],
+                               "doc": "Snapshot ID that added the manifest",
+                               "field-id": 503
+                       },
+                       {
+                               "name": "added_data_files_count",
+                               "type": ["null", "int"],
+                               "doc": "Added entry count",
+                               "field-id": 504
+                       },
+                       {
+                               "name": "existing_data_files_count",
+                               "type": ["null", "int"],
+                               "doc": "Existing entry count",
+                               "field-id": 505
+                       },
+                       {
+                               "name": "deleted_data_files_count",
+                               "type": ["null", "int"],
+                               "doc": "Deleted entry count",
+                               "field-id": 506
+                       },
+                       {
+                               "name": "partitions",
+                               "type": [
+                                       "null",
+                                       {
+                                               "type": "array",
+                                               "items": {
+                                                       "type": "record",
+                                                       "name": "r508",
+                                                       "fields": [
+                                                               {
+                                                                       "name": 
"contains_null",
+                                                                       "type": 
"boolean",
+                                                                       "doc": 
"True if any file has a null partition value",
+                                                                       
"field-id": 509
+                                                               },
+                                                               {
+                                                                       "name": 
"contains_nan",
+                                                                       "type": 
["null", "boolean"],
+                                                                       "doc": 
"True if any file has a nan partition value",
+                                                                       
"field-id": 518
+                                                               },
+                                                               {
+                                                                       "name": 
"lower_bound",
+                                                                       "type": 
["null", "bytes"],
+                                                                       "doc": 
"Partition lower bound for all files",                                          
                 
+                                                                       
"field-id": 510
+                                                               },
+                                                               {
+                                                                       "name": 
"upper_bound",
+                                                                       "type": 
["null", "bytes"],
+                                                                       "doc": 
"Partition upper bound for all files",                                          
                 
+                                                                       
"field-id": 511
+                                                               }
+                                                       ]
+                                               },
+                                               "element-id": 508
+                                       }
+                               ],
+                               "doc": "Summary for each partition",            
        
+                               "field-id": 507
+                       },
+                       {"name": "added_rows_count", "type": ["null", "long"], 
"doc": "Added rows count", "field-id": 512},
+                       {
+                               "name": "existing_rows_count",
+                               "type": ["null", "long"],
+                               "doc": "Existing rows count",                   
+                               "field-id": 513
+                       },
+                       {
+                               "name": "deleted_rows_count",
+                               "type": ["null", "long"],
+                               "doc": "Deleted rows count",                    
+                               "field-id": 514
+                       }
+               ]
+       }`))
+
+       AvroSchemaCache.Add(ManifestListV2Key, avro.MustParse(`{
+        "type": "record",
+        "name": "manifest_file",
+        "fields": [
+            {"name": "manifest_path", "type": "string", "doc": "Location URI 
with FS scheme", "field-id": 500},
+            {"name": "manifest_length", "type": "long", "doc": "Total file 
size in bytes", "field-id": 501},
+            {"name": "partition_spec_id", "type": "int", "doc": "Spec ID used 
to write", "field-id": 502},
+            {"name": "content", "type": "int", "doc": "Contents of the 
manifest: 0=data, 1=deletes", "field-id": 517},
+            {
+                "name": "sequence_number",
+                "type": "long",
+                "doc": "Sequence number when the manifest was added",
+                "field-id": 515
+            },
+            {
+                "name": "min_sequence_number",
+                "type": "long",
+                "doc": "Lowest sequence number in the manifest",
+                "field-id": 516
+            },
+            {"name": "added_snapshot_id", "type": "long", "doc": "Snapshot ID 
that added the manifest", "field-id": 503},
+            {"name": "added_files_count", "type": "int", "doc": "Added entry 
count", "field-id": 504},
+            {"name": "existing_files_count", "type": "int", "doc": "Existing 
entry count", "field-id": 505},
+            {"name": "deleted_files_count", "type": "int", "doc": "Deleted 
entry count", "field-id": 506},
+            {"name": "added_rows_count", "type": "long", "doc": "Added rows 
count", "field-id": 512},
+            {"name": "existing_rows_count", "type": "long", "doc": "Existing 
rows count", "field-id": 513},
+            {"name": "deleted_rows_count", "type": "long", "doc": "Deleted 
rows count", "field-id": 514},
+            {
+                "name": "partitions",
+                "type": [
+                    "null",
+                    {
+                        "type": "array",
+                        "items": {
+                            "type": "record",
+                            "name": "r508",
+                            "fields": [
+                                {
+                                    "name": "contains_null",
+                                    "type": "boolean",
+                                    "doc": "True if any file has a null 
partition value",
+                                    "field-id": 509
+                                },
+                                {
+                                    "name": "contains_nan",
+                                    "type": ["null", "boolean"],
+                                    "doc": "True if any file has a nan 
partition value",                                  
+                                    "field-id": 518
+                                },
+                                {
+                                    "name": "lower_bound",
+                                    "type": ["null", "bytes"],
+                                    "doc": "Partition lower bound for all 
files",                                    
+                                    "field-id": 510
+                                },
+                                {
+                                    "name": "upper_bound",
+                                    "type": ["null", "bytes"],
+                                    "doc": "Partition upper bound for all 
files",
+                                    "field-id": 511
+                                }
+                            ]
+                        },
+                        "element-id": 508
+                    }
+                ],
+                "doc": "Summary for each partition",
+                "field-id": 507
+            }
+        ]
+    }`))
+
+       AvroSchemaCache.Add(ManifestEntryV1Key, avro.MustParse(`{
+        "type": "record",
+        "name": "manifest_entry",
+        "fields": [
+            {"name": "status", "type": "int", "field-id": 0},
+            {"name": "snapshot_id", "type": "long", "field-id": 1},
+            {
+                "name": "data_file",
+                "type": {
+                    "type": "record",
+                    "name": "r2",
+                    "fields": [
+                        {"name": "file_path", "type": "string", "doc": 
"Location URI with FS scheme", "field-id": 100},
+                        {
+                            "name": "file_format",
+                            "type": "string",
+                            "doc": "File format name: avro, orc, or parquet",
+                            "field-id": 101
+                        },
+                        {
+                            "name": "partition",
+                            "type": {
+                                "type": "record",
+                                "name": "r102",
+                                "fields": [
+                                    {"field-id": 1000, "name": "VendorID", 
"type": ["null", "int"]},
+                                    {
+                                        "field-id": 1001,                      
                  
+                                        "name": "tpep_pickup_datetime",
+                                        "type": ["null", {"type": "int", 
"logicalType": "date"}]
+                                    }
+                                ]
+                            },
+                            "field-id": 102
+                        },
+                        {"name": "record_count", "type": "long", "doc": 
"Number of records in the file", "field-id": 103},
+                        {"name": "file_size_in_bytes", "type": "long", "doc": 
"Total file size in bytes", "field-id": 104},
+                        {"name": "block_size_in_bytes", "type": "long", 
"field-id": 105},
+                        {
+                            "name": "column_sizes",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k117_v118",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 117},
+                                            {"name": "value", "type": "long", 
"field-id": 118}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to total size on disk",   
                         
+                            "field-id": 108
+                        },
+                        {
+                            "name": "value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k119_v120",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 119},
+                                            {"name": "value", "type": "long", 
"field-id": 120}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to total count, including 
null and NaN",                            
+                            "field-id": 109
+                        },
+                        {
+                            "name": "null_value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k121_v122",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 121},
+                                            {"name": "value", "type": "long", 
"field-id": 122}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to null value count",     
                       
+                            "field-id": 110
+                        },
+                        {
+                            "name": "nan_value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k138_v139",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 138},
+                                            {"name": "value", "type": "long", 
"field-id": 139}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to number of NaN values 
in the column",                            
+                            "field-id": 137
+                        },
+                        {
+                            "name": "lower_bounds",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k126_v127",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 126},
+                                            {"name": "value", "type": "bytes", 
"field-id": 127}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to lower bound",          
                  
+                            "field-id": 125
+                        },
+                        {
+                            "name": "upper_bounds",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k129_v130",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 129},
+                                            {"name": "value", "type": "bytes", 
"field-id": 130}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to upper bound",          
                  
+                            "field-id": 128
+                        },
+                        {
+                            "name": "key_metadata",
+                            "type": ["null", "bytes"],
+                            "doc": "Encryption key metadata blob",             
               
+                            "field-id": 131
+                        },
+                        {
+                            "name": "split_offsets",
+                            "type": ["null", {"type": "array", "items": 
"long", "element-id": 133}],
+                            "doc": "Splittable offsets",                       
     
+                            "field-id": 132
+                        },
+                        {
+                            "name": "sort_order_id",
+                            "type": ["null", "int"],
+                            "doc": "Sort order ID",                            
+                            "field-id": 140
+                        }
+                    ]
+                },
+                "field-id": 2
+            }
+        ]
+    }`))
+
+       AvroSchemaCache.Add(ManifestEntryV2Key, avro.MustParse(`{
+        "type": "record",
+        "name": "manifest_entry",
+        "fields": [
+            {"name": "status", "type": "int", "field-id": 0},
+            {"name": "snapshot_id", "type": ["null", "long"], "field-id": 1},
+                       {"name": "sequence_number", "type": ["null", "long"], 
"field-id": 3},
+                       {"name": "file_sequence_number", "type": ["null", 
"long"], "field-id": 4},
+            {
+                "name": "data_file",
+                "type": {
+                    "type": "record",
+                    "name": "r2",
+                    "fields": [
+                                               {"name": "content", "type": 
"int", "doc": "Type of content stored by the data file", "field-id": 134},
+                        {"name": "file_path", "type": "string", "doc": 
"Location URI with FS scheme", "field-id": 100},
+                        {
+                            "name": "file_format",
+                            "type": "string",
+                            "doc": "File format name: avro, orc, or parquet",
+                            "field-id": 101
+                        },
+                        {
+                            "name": "partition",
+                            "type": {
+                                "type": "record",
+                                "name": "r102",
+                                "fields": [
+                                    {"field-id": 1000, "name": "VendorID", 
"type": ["null", "int"]},
+                                    {
+                                        "field-id": 1001,                      
                  
+                                        "name": "tpep_pickup_datetime",
+                                        "type": ["null", {"type": "int", 
"logicalType": "date"}]
+                                    }
+                                ]
+                            },
+                            "field-id": 102
+                        },
+                        {"name": "record_count", "type": "long", "doc": 
"Number of records in the file", "field-id": 103},
+                        {"name": "file_size_in_bytes", "type": "long", "doc": 
"Total file size in bytes", "field-id": 104},                        
+                        {
+                            "name": "column_sizes",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k117_v118",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 117},
+                                            {"name": "value", "type": "long", 
"field-id": 118}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to total size on disk",   
                         
+                            "field-id": 108
+                        },
+                        {
+                            "name": "value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k119_v120",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 119},
+                                            {"name": "value", "type": "long", 
"field-id": 120}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to total count, including 
null and NaN",                            
+                            "field-id": 109
+                        },
+                        {
+                            "name": "null_value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k121_v122",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 121},
+                                            {"name": "value", "type": "long", 
"field-id": 122}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to null value count",     
                       
+                            "field-id": 110
+                        },
+                        {
+                            "name": "nan_value_counts",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k138_v139",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 138},
+                                            {"name": "value", "type": "long", 
"field-id": 139}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to number of NaN values 
in the column",                            
+                            "field-id": 137
+                        },
+                        {
+                            "name": "lower_bounds",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k126_v127",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 126},
+                                            {"name": "value", "type": "bytes", 
"field-id": 127}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to lower bound",          
                  
+                            "field-id": 125
+                        },
+                        {
+                            "name": "upper_bounds",
+                            "type": [
+                                "null",
+                                {
+                                    "type": "array",
+                                    "items": {
+                                        "type": "record",
+                                        "name": "k129_v130",
+                                        "fields": [
+                                            {"name": "key", "type": "int", 
"field-id": 129},
+                                            {"name": "value", "type": "bytes", 
"field-id": 130}
+                                        ]
+                                    },
+                                    "logicalType": "map"
+                                }
+                            ],
+                            "doc": "Map of column id to upper bound",          
                  
+                            "field-id": 128
+                        },
+                        {
+                            "name": "key_metadata",
+                            "type": ["null", "bytes"],
+                            "doc": "Encryption key metadata blob",             
               
+                            "field-id": 131
+                        },
+                        {
+                            "name": "split_offsets",
+                            "type": ["null", {"type": "array", "items": 
"long", "element-id": 133}],
+                            "doc": "Splittable offsets",                       
     
+                            "field-id": 132
+                        },
+                                               {
+                                                       "name": "equality_ids",
+                                                       "type": ["null", 
{"type": "array", "items": "int", "element-id": 136}],
+                                                       "doc": "Field ids used 
to determine row equality for delete files",
+                                                       "field-id": 135
+                                               },
+                        {
+                            "name": "sort_order_id",
+                            "type": ["null", "int"],
+                            "doc": "Sort order ID",                            
+                            "field-id": 140
+                        }
+                    ]
+                },
+                "field-id": 2
+            }
+        ]
+    }`))
+}
diff --git a/internal/mock_fs.go b/internal/mock_fs.go
new file mode 100644
index 0000000..95f6c3f
--- /dev/null
+++ b/internal/mock_fs.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "bytes"
+       "errors"
+       "io/fs"
+
+       "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/mock"
+)
+
+type MockFS struct {
+       mock.Mock
+}
+
+func (m *MockFS) Open(name string) (io.File, error) {
+       args := m.Called(name)
+       return args.Get(0).(io.File), args.Error(1)
+}
+
+func (m *MockFS) Remove(name string) error {
+       return m.Called(name).Error(0)
+}
+
+type MockFSReadFile struct {
+       MockFS
+}
+
+func (m *MockFSReadFile) ReadFile(name string) ([]byte, error) {
+       args := m.Called(name)
+       return args.Get(0).([]byte), args.Error(1)
+}
+
+type MockFile struct {
+       Contents *bytes.Reader
+
+       closed bool
+}
+
+func (m *MockFile) Stat() (fs.FileInfo, error) {
+       return nil, nil
+}
+
+func (m *MockFile) Read(p []byte) (int, error) {
+       return m.Contents.Read(p)
+}
+
+func (m *MockFile) Close() error {
+       if m.closed {
+               return errors.New("already closed")
+       }
+       m.closed = true
+       return nil
+}
+
+func (m *MockFile) ReadAt(p []byte, off int64) (n int, err error) {
+       if m.closed {
+               return 0, errors.New("already closed")
+       }
+       return m.Contents.ReadAt(p, off)
+}
+
+func (m *MockFile) Seek(offset int64, whence int) (n int64, err error) {
+       if m.closed {
+               return 0, errors.New("already closed")
+       }
+       return m.Contents.Seek(offset, whence)
+}
diff --git a/io/io.go b/io/io.go
new file mode 100644
index 0000000..abe5971
--- /dev/null
+++ b/io/io.go
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package io
+
+import (
+       "errors"
+       "fmt"
+       "io"
+       "io/fs"
+       "net/url"
+       "strings"
+)
+
+// IO is an interface to a hierarchical file system.
+//
+// The IO interface is the minimum implementation required for a file
+// system to utilize an iceberg table. A file system may implement
+// additional interfaces, such as ReadFileIO, to provide additional or
+// optimized functionality.
+type IO interface {
+       // Open opens the named file.
+       //
+       // When Open returns an error, it should be of type *PathError
+       // with the Op field set to "open", the Path field set to name,
+       // and the Err field describing the problem.
+       //
+       // Open should reject attempts to open names that do not satisfy
+       // fs.ValidPath(name), returning a *PathError with Err set to
+       // ErrInvalid or ErrNotExist.
+       Open(name string) (File, error)
+
+       // Remove removes the named file or (empty) directory.
+       //
+       // If there is an error, it will be of type *PathError.
+       Remove(name string) error
+}
+
+// ReadFileIO is the interface implemented by a file system that
+// provides an optimized implementation of ReadFile.
+type ReadFileIO interface {
+       IO
+
+       // ReadFile reads the named file and returns its contents.
+       // A successful call returns a nil error, not io.EOF.
+       // (Because ReadFile reads the whole file, the expected EOF
+       // from the final Read is not treated as an error to be reported.)
+       //
+       // The caller is permitted to modify the returned byte slice.
+       // This method should return a copy of the underlying data.
+       ReadFile(name string) ([]byte, error)
+}
+
+// A File provides access to a single file. The File interface is the
+// minimum implementation required for Iceberg to interact with a file.
+// Directory files should also implement
+type File interface {
+       fs.File
+       io.ReadSeekCloser
+       io.ReaderAt
+}
+
+// A ReadDirFile is a directory file whose entries can be read with the
+// ReadDir method. Every directory file should implement this interface.
+// (It is permissible for any file to implement this interface, but
+// if so ReadDir should return an error for non-directories.)
+type ReadDirFile interface {
+       File
+
+       // ReadDir read the contents of the directory and returns a slice
+       // of up to n DirEntry values in directory order. Subsequent calls
+       // on the same file will yield further DirEntry values.
+       //
+       // If n > 0, ReadDir returns at most n DirEntry structures. In this
+       // case, if ReadDir returns an empty slice, it will return a non-nil
+       // error explaining why.
+       //
+       // At the end of a directory, the error is io.EOF. (ReadDir must return
+       // io.EOF itself, not an error wrapping io.EOF.)
+       //
+       // If n <= 0, ReadDir returns all the DirEntry values from the directory
+       // in a single slice. In this case, if ReadDir succeeds (reads all the 
way
+       // to the end of the directory), it returns the slice and a nil error.
+       // If it encounters an error before the end of the directory, ReadDir
+       // returns the DirEntry list read until that point and a non-nil error.
+       ReadDir(n int) ([]fs.DirEntry, error)
+}
+
+// FS wraps an io/fs.FS as an IO interface.
+func FS(fsys fs.FS) IO {
+       if _, ok := fsys.(fs.ReadFileFS); ok {
+               return readFileFS{ioFS{fsys, nil}}
+       }
+       return ioFS{fsys, nil}
+}
+
+// FSPreProcName wraps an io/fs.FS like FS, only if fn is non-nil then
+// it is called to preprocess any filenames before they are passed to
+// the underlying fsys.
+func FSPreProcName(fsys fs.FS, fn func(string) string) IO {
+       if _, ok := fsys.(fs.ReadFileFS); ok {
+               return readFileFS{ioFS{fsys, fn}}
+       }
+       return ioFS{fsys, fn}
+}
+
+type readFileFS struct {
+       ioFS
+}
+
+func (r readFileFS) ReadFile(name string) ([]byte, error) {
+       if r.preProcessName != nil {
+               name = r.preProcessName(name)
+       }
+
+       rfs, ok := r.fsys.(fs.ReadFileFS)
+       if !ok {
+               return nil, errMissingReadFile
+       }
+       return rfs.ReadFile(name)
+}
+
+type ioFS struct {
+       fsys fs.FS
+
+       preProcessName func(string) string
+}
+
+func (f ioFS) Open(name string) (File, error) {
+       if f.preProcessName != nil {
+               name = f.preProcessName(name)
+       }
+
+       if name == "/" {
+               name = "."
+       } else {
+               name = strings.TrimPrefix(name, "/")
+       }
+       file, err := f.fsys.Open(name)
+       if err != nil {
+               return nil, err
+       }
+
+       return ioFile{file}, nil
+}
+
+func (f ioFS) Remove(name string) error {
+       r, ok := f.fsys.(interface{ Remove(name string) error })
+       if !ok {
+               return errMissingRemove
+       }
+       return r.Remove(name)
+}
+
+var (
+       errMissingReadDir  = errors.New("fs.File directory missing ReadDir 
method")
+       errMissingSeek     = errors.New("fs.File missing Seek method")
+       errMissingReadAt   = errors.New("fs.File missing ReadAt")
+       errMissingRemove   = errors.New("fs.FS missing Remove method")
+       errMissingReadFile = errors.New("fs.FS missing ReadFile method")
+)
+
+type ioFile struct {
+       file fs.File
+}
+
+func (f ioFile) Close() error               { return f.file.Close() }
+func (f ioFile) Read(b []byte) (int, error) { return f.file.Read(b) }
+func (f ioFile) Stat() (fs.FileInfo, error) { return f.file.Stat() }
+func (f ioFile) Seek(offset int64, whence int) (int64, error) {
+       s, ok := f.file.(io.Seeker)
+       if !ok {
+               return 0, errMissingSeek
+       }
+       return s.Seek(offset, whence)
+}
+
+func (f ioFile) ReadAt(p []byte, off int64) (n int, err error) {
+       r, ok := f.file.(io.ReaderAt)
+       if !ok {
+               return 0, errMissingReadAt
+       }
+       return r.ReadAt(p, off)
+}
+
+func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) {
+       d, ok := f.file.(fs.ReadDirFile)
+       if !ok {
+               return nil, errMissingReadDir
+       }
+
+       return d.ReadDir(count)
+}
+
+func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
+       parsed, err := url.Parse(path)
+       if err != nil {
+               return nil, err
+       }
+
+       switch parsed.Scheme {
+       case "s3", "s3a", "s3n":
+               return createS3FileIO(parsed, props)
+       case "file", "":
+               return LocalFS{}, nil
+       default:
+               return nil, fmt.Errorf("IO for file '%s' not implemented", path)
+       }
+}
+
+// LoadFS takes a map of properties and an optional URI location
+// and attempts to infer an IO object from it.
+//
+// A schema of "file://" or an empty string will result in a LocalFS
+// implementation. Otherwise this will return an error if the schema
+// does not yet have an implementation here.
+//
+// Currently only LocalFS and S3 are implemented.
+func LoadFS(props map[string]string, location string) (IO, error) {
+       if location == "" {
+               location = props["warehouse"]
+       }
+
+       iofs, err := inferFileIOFromSchema(location, props)
+       if err != nil {
+               return nil, err
+       }
+
+       if iofs == nil {
+               iofs = LocalFS{}
+       }
+
+       return iofs, nil
+}
diff --git a/go.mod b/io/local.go
similarity index 66%
copy from go.mod
copy to io/local.go
index 6bea61d..befa831 100644
--- a/go.mod
+++ b/io/local.go
@@ -15,19 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-module github.com/apache/iceberg-go
+package io
 
-go 1.20
+import "os"
 
-require (
-       github.com/stretchr/testify v1.8.4
-       golang.org/x/exp v0.0.0-20230905200255-921286631fa9
-)
+// LocalFS is an implementation of IO that implements interaction with
+// the local file system.
+type LocalFS struct{}
 
-require (
-       github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/kr/pretty v0.3.1 // indirect
-       github.com/pmezard/go-difflib v1.0.0 // indirect
-       gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
-       gopkg.in/yaml.v3 v3.0.1 // indirect
-)
+func (LocalFS) Open(name string) (File, error) {
+       return os.Open(name)
+}
+
+func (LocalFS) Remove(name string) error {
+       return os.Remove(name)
+}
diff --git a/io/s3.go b/io/s3.go
new file mode 100644
index 0000000..19f0887
--- /dev/null
+++ b/io/s3.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package io
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "net/url"
+       "os"
+       "strings"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+       awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
+       "github.com/aws/aws-sdk-go-v2/config"
+       "github.com/aws/aws-sdk-go-v2/credentials"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/wolfeidau/s3iofs"
+)
+
+// Constants for S3 configuration options
+const (
+       S3Region          = "s3.region"
+       S3SessionToken    = "s3.session-token"
+       S3SecretAccessKey = "s3.secret-access-key"
+       S3AccessKeyID     = "s3.access-key-id"
+       S3EndpointURL     = "s3.endpoint"
+       S3ProxyURI        = "s3.proxy-uri"
+)
+
+func createS3FileIO(parsed *url.URL, props map[string]string) (IO, error) {
+       opts := []func(*config.LoadOptions) error{}
+       endpoint, ok := props[S3EndpointURL]
+       if !ok {
+               endpoint = os.Getenv("AWS_S3_ENDPOINT")
+       }
+
+       if endpoint != "" {
+               opts = append(opts, 
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service,
 region string, options ...interface{}) (aws.Endpoint, error) {
+                       if service != s3.ServiceID {
+                               // fallback to default resolution for the 
service
+                               return aws.Endpoint{}, 
&aws.EndpointNotFoundError{}
+                       }
+
+                       return aws.Endpoint{
+                               URL:               endpoint,
+                               SigningRegion:     region,
+                               HostnameImmutable: true,
+                       }, nil
+               })))
+       }
+
+       if region, ok := props[S3Region]; ok {
+               opts = append(opts, config.WithRegion(region))
+       }
+
+       accessKey, secretAccessKey := props[S3AccessKeyID], 
props[S3SecretAccessKey]
+       token := props[S3SessionToken]
+       if accessKey != "" || secretAccessKey != "" || token != "" {
+               opts = append(opts, 
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
+                       props[S3AccessKeyID], props[S3SecretAccessKey], 
props[S3SessionToken])))
+       }
+
+       if proxy, ok := props[S3ProxyURI]; ok {
+               proxyURL, err := url.Parse(proxy)
+               if err != nil {
+                       return nil, fmt.Errorf("invalid s3 proxy url '%s'", 
proxy)
+               }
+
+               opts = append(opts, 
config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
+                       func(t *http.Transport) {
+                               t.Proxy = http.ProxyURL(proxyURL)
+                       },
+               )))
+       }
+
+       awscfg, err := config.LoadDefaultConfig(context.Background(), opts...)
+       if err != nil {
+               return nil, err
+       }
+
+       preprocess := func(n string) string {
+               _, after, found := strings.Cut(n, "://")
+               if found {
+                       n = after
+               }
+
+               return strings.TrimPrefix(n, parsed.Host)
+       }
+
+       s3fs := s3iofs.New(parsed.Host, awscfg)
+       return FSPreProcName(s3fs, preprocess), nil
+}
diff --git a/manifest.go b/manifest.go
new file mode 100644
index 0000000..628b721
--- /dev/null
+++ b/manifest.go
@@ -0,0 +1,803 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "io"
+       "sync"
+
+       iceio "github.com/apache/iceberg-go/io"
+
+       "github.com/hamba/avro/v2/ocf"
+)
+
+// ManifestContent indicates the type of data inside of the files
+// described by a manifest. This will indicate whether the data files
+// contain active data or deleted rows.
+type ManifestContent int32
+
+const (
+       ManifestContentData    ManifestContent = 0
+       ManifestContentDeletes ManifestContent = 1
+)
+
+type FieldSummary struct {
+       ContainsNull bool    `avro:"contains_null"`
+       ContainsNaN  *bool   `avro:"contains_nan"`
+       LowerBound   *[]byte `avro:"lower_bound"`
+       UpperBound   *[]byte `avro:"upper_bound"`
+}
+
+// ManifestV1Builder is a helper for building a V1 manifest file
+// struct which will conform to the ManifestFile interface.
+type ManifestV1Builder struct {
+       m *manifestFileV1
+}
+
+// NewManifestV1Builder is passed all of the required fields and then allows
+// all of the optional fields to be set by calling the corresponding methods
+// before calling [ManifestV1Builder.Build] to construct the object.
+func NewManifestV1Builder(path string, length int64, partitionSpecID int32) 
*ManifestV1Builder {
+       return &ManifestV1Builder{
+               m: &manifestFileV1{
+                       Path:   path,
+                       Len:    length,
+                       SpecID: partitionSpecID,
+               },
+       }
+}
+
+func (b *ManifestV1Builder) AddedSnapshotID(id int64) *ManifestV1Builder {
+       b.m.AddedSnapshotID = &id
+       return b
+}
+
+func (b *ManifestV1Builder) AddedFiles(cnt int32) *ManifestV1Builder {
+       b.m.AddedFilesCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) ExistingFiles(cnt int32) *ManifestV1Builder {
+       b.m.ExistingFilesCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) DeletedFiles(cnt int32) *ManifestV1Builder {
+       b.m.DeletedFilesCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) AddedRows(cnt int64) *ManifestV1Builder {
+       b.m.AddedRowsCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) ExistingRows(cnt int64) *ManifestV1Builder {
+       b.m.ExistingRowsCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) DeletedRows(cnt int64) *ManifestV1Builder {
+       b.m.DeletedRowsCount = &cnt
+       return b
+}
+
+func (b *ManifestV1Builder) Partitions(p []FieldSummary) *ManifestV1Builder {
+       b.m.PartitionList = &p
+       return b
+}
+
+func (b *ManifestV1Builder) KeyMetadata(km []byte) *ManifestV1Builder {
+       b.m.Key = km
+       return b
+}
+
+// Build returns the constructed manifest file, after calling Build this
+// builder should not be used further as we avoid copying by just returning
+// a pointer to the constructed manifest file. Further calls to the modifier
+// methods after calling build would modify the constructed ManifestFile.
+func (b *ManifestV1Builder) Build() ManifestFile {
+       return b.m
+}
+
+type manifestFileV1 struct {
+       Path               string          `avro:"manifest_path"`
+       Len                int64           `avro:"manifest_length"`
+       SpecID             int32           `avro:"partition_spec_id"`
+       AddedSnapshotID    *int64          `avro:"added_snapshot_id"`
+       AddedFilesCount    *int32          `avro:"added_data_files_count"`
+       ExistingFilesCount *int32          `avro:"existing_data_files_count"`
+       DeletedFilesCount  *int32          `avro:"deleted_data_files_count"`
+       AddedRowsCount     *int64          `avro:"added_rows_count"`
+       ExistingRowsCount  *int64          `avro:"existing_rows_count"`
+       DeletedRowsCount   *int64          `avro:"deleted_rows_count"`
+       PartitionList      *[]FieldSummary `avro:"partitions"`
+       Key                []byte          `avro:"key_metadata"`
+}
+
+func (*manifestFileV1) Version() int             { return 1 }
+func (m *manifestFileV1) FilePath() string       { return m.Path }
+func (m *manifestFileV1) Length() int64          { return m.Len }
+func (m *manifestFileV1) PartitionSpecID() int32 { return m.SpecID }
+func (m *manifestFileV1) ManifestContent() ManifestContent {
+       return ManifestContentData
+}
+func (m *manifestFileV1) SnapshotID() int64 {
+       if m.AddedSnapshotID == nil {
+               return 0
+       }
+       return *m.AddedSnapshotID
+}
+
+func (m *manifestFileV1) AddedDataFiles() int32 {
+       if m.AddedFilesCount == nil {
+               return 0
+       }
+       return *m.AddedFilesCount
+}
+
+func (m *manifestFileV1) ExistingDataFiles() int32 {
+       if m.ExistingFilesCount == nil {
+               return 0
+       }
+       return *m.ExistingFilesCount
+}
+
+func (m *manifestFileV1) DeletedDataFiles() int32 {
+       if m.DeletedFilesCount == nil {
+               return 0
+       }
+       return *m.DeletedFilesCount
+}
+
+func (m *manifestFileV1) AddedRows() int64 {
+       if m.AddedRowsCount == nil {
+               return 0
+       }
+       return *m.AddedRowsCount
+}
+
+func (m *manifestFileV1) ExistingRows() int64 {
+       if m.ExistingRowsCount == nil {
+               return 0
+       }
+       return *m.ExistingRowsCount
+}
+
+func (m *manifestFileV1) DeletedRows() int64 {
+       if m.DeletedRowsCount == nil {
+               return 0
+       }
+       return *m.DeletedRowsCount
+}
+
+func (m *manifestFileV1) HasAddedFiles() bool {
+       return m.AddedFilesCount == nil || *m.AddedFilesCount > 0
+}
+
+func (m *manifestFileV1) HasExistingFiles() bool {
+       return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0
+}
+
+func (m *manifestFileV1) SequenceNum() int64    { return 0 }
+func (m *manifestFileV1) MinSequenceNum() int64 { return 0 }
+func (m *manifestFileV1) KeyMetadata() []byte   { return m.Key }
+func (m *manifestFileV1) Partitions() []FieldSummary {
+       if m.PartitionList == nil {
+               return nil
+       }
+       return *m.PartitionList
+}
+
+func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
+       return fetchManifestEntries(m, fs, discardDeleted)
+}
+
+// ManifestV2Builder is a helper for building a V2 manifest file
+// struct which will conform to the ManifestFile interface.
+type ManifestV2Builder struct {
+       m *manifestFileV2
+}
+
+// NewManifestV2Builder is constructed with the primary fields, with the 
remaining
+// fields set to their zero value unless modified by calling the corresponding
+// methods of the builder. Then calling [ManifestV2Builder.Build] to retrieve 
the
+// constructed ManifestFile.
+func NewManifestV2Builder(path string, length int64, partitionSpecID int32, 
content ManifestContent, addedSnapshotID int64) *ManifestV2Builder {
+       return &ManifestV2Builder{
+               m: &manifestFileV2{
+                       Path:            path,
+                       Len:             length,
+                       SpecID:          partitionSpecID,
+                       Content:         content,
+                       AddedSnapshotID: addedSnapshotID,
+               },
+       }
+}
+
+func (b *ManifestV2Builder) SequenceNum(num, minSeqNum int64) 
*ManifestV2Builder {
+       b.m.SeqNumber, b.m.MinSeqNumber = num, minSeqNum
+       return b
+}
+
+func (b *ManifestV2Builder) AddedFiles(cnt int32) *ManifestV2Builder {
+       b.m.AddedFilesCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) ExistingFiles(cnt int32) *ManifestV2Builder {
+       b.m.ExistingFilesCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) DeletedFiles(cnt int32) *ManifestV2Builder {
+       b.m.DeletedFilesCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) AddedRows(cnt int64) *ManifestV2Builder {
+       b.m.AddedRowsCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) ExistingRows(cnt int64) *ManifestV2Builder {
+       b.m.ExistingRowsCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) DeletedRows(cnt int64) *ManifestV2Builder {
+       b.m.DeletedRowsCount = cnt
+       return b
+}
+
+func (b *ManifestV2Builder) Partitions(p []FieldSummary) *ManifestV2Builder {
+       b.m.PartitionList = &p
+       return b
+}
+
+func (b *ManifestV2Builder) KeyMetadata(km []byte) *ManifestV2Builder {
+       b.m.Key = km
+       return b
+}
+
+// Build returns the constructed manifest file, after calling Build this
+// builder should not be used further as we avoid copying by just returning
+// a pointer to the constructed manifest file. Further calls to the modifier
+// methods after calling build would modify the constructed ManifestFile.
+func (b *ManifestV2Builder) Build() ManifestFile {
+       return b.m
+}
+
+type manifestFileV2 struct {
+       Path               string          `avro:"manifest_path"`
+       Len                int64           `avro:"manifest_length"`
+       SpecID             int32           `avro:"partition_spec_id"`
+       Content            ManifestContent `avro:"content"`
+       SeqNumber          int64           `avro:"sequence_number"`
+       MinSeqNumber       int64           `avro:"min_sequence_number"`
+       AddedSnapshotID    int64           `avro:"added_snapshot_id"`
+       AddedFilesCount    int32           `avro:"added_files_count"`
+       ExistingFilesCount int32           `avro:"existing_files_count"`
+       DeletedFilesCount  int32           `avro:"deleted_files_count"`
+       AddedRowsCount     int64           `avro:"added_rows_count"`
+       ExistingRowsCount  int64           `avro:"existing_rows_count"`
+       DeletedRowsCount   int64           `avro:"deleted_rows_count"`
+       PartitionList      *[]FieldSummary `avro:"partitions"`
+       Key                []byte          `avro:"key_metadata"`
+}
+
+func (*manifestFileV2) Version() int { return 2 }
+
+func (m *manifestFileV2) FilePath() string                 { return m.Path }
+func (m *manifestFileV2) Length() int64                    { return m.Len }
+func (m *manifestFileV2) PartitionSpecID() int32           { return m.SpecID }
+func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content }
+func (m *manifestFileV2) SnapshotID() int64 {
+       return m.AddedSnapshotID
+}
+
+func (m *manifestFileV2) AddedDataFiles() int32 {
+       return m.AddedFilesCount
+}
+
+func (m *manifestFileV2) ExistingDataFiles() int32 {
+       return m.ExistingFilesCount
+}
+
+func (m *manifestFileV2) DeletedDataFiles() int32 {
+       return m.DeletedFilesCount
+}
+
+func (m *manifestFileV2) AddedRows() int64 {
+       return m.AddedRowsCount
+}
+
+func (m *manifestFileV2) ExistingRows() int64 {
+       return m.ExistingRowsCount
+}
+
+func (m *manifestFileV2) DeletedRows() int64 {
+       return m.DeletedRowsCount
+}
+
+func (m *manifestFileV2) SequenceNum() int64    { return m.SeqNumber }
+func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber }
+func (m *manifestFileV2) KeyMetadata() []byte   { return m.Key }
+
+func (m *manifestFileV2) Partitions() []FieldSummary {
+       if m.PartitionList == nil {
+               return nil
+       }
+       return *m.PartitionList
+}
+
+func (m *manifestFileV2) HasAddedFiles() bool {
+       return m.AddedFilesCount > 0
+}
+
+func (m *manifestFileV2) HasExistingFiles() bool {
+       return m.ExistingFilesCount > 0
+}
+
+func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
+       return fetchManifestEntries(m, fs, discardDeleted)
+}
+
+func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
+       f, err := fs.Open(m.FilePath())
+       if err != nil {
+               return nil, err
+       }
+       defer f.Close()
+
+       dec, err := ocf.NewDecoder(f)
+       if err != nil {
+               return nil, err
+       }
+
+       metadata := dec.Metadata()
+       isVer1 := true
+       if string(metadata["format-version"]) == "2" {
+               isVer1 = false
+       }
+
+       results := make([]ManifestEntry, 0)
+       for dec.HasNext() {
+               var tmp ManifestEntry
+               if isVer1 {
+                       tmp = &manifestEntryV1{}
+               } else {
+                       tmp = &manifestEntryV2{}
+               }
+
+               if err := dec.Decode(tmp); err != nil {
+                       return nil, err
+               }
+
+               if !discardDeleted || tmp.Status() != EntryStatusDELETED {
+                       tmp.inheritSeqNum(m)
+                       results = append(results, tmp)
+               }
+       }
+
+       return results, dec.Error()
+}
+
+// ManifestFile is the interface which covers both V1 and V2 manifest files.
+type ManifestFile interface {
+       // Version returns the version number of this manifest file.
+       // It should be 1 or 2.
+       Version() int
+       // FilePath is the location URI of this manifest file.
+       FilePath() string
+       // Length is the length in bytes of the manifest file.
+       Length() int64
+       // PartitionSpecID is the ID of the partition spec used to write
+       // this manifest. It must be listed in the table metadata
+       // partition-specs.
+       PartitionSpecID() int32
+       // ManifestContent is the type of files tracked by this manifest,
+       // either data or delete files. All v1 manifests track data files.
+       ManifestContent() ManifestContent
+       // SnapshotID is the ID of the snapshot where this manifest file
+       // was added.
+       SnapshotID() int64
+       // AddedDataFiles returns the number of entries in the manifest that
+       // have the status of EntryStatusADDED.
+       AddedDataFiles() int32
+       // ExistingDataFiles returns the number of entries in the manifest
+       // which have the status of EntryStatusEXISTING.
+       ExistingDataFiles() int32
+       // DeletedDataFiles returns the number of entries in the manifest
+       // which have the status of EntryStatusDELETED.
+       DeletedDataFiles() int32
+       // AddedRows returns the number of rows in all files of the manifest
+       // that have status EntryStatusADDED.
+       AddedRows() int64
+       // ExistingRows returns the number of rows in all files of the manifest
+       // which have status EntryStatusEXISTING.
+       ExistingRows() int64
+       // DeletedRows returns the number of rows in all files of the manifest
+       // which have status EntryStatusDELETED.
+       DeletedRows() int64
+       // SequenceNum returns the sequence number when this manifest was
+       // added to the table. Will be 0 for v1 manifest lists.
+       SequenceNum() int64
+       // MinSequenceNum is the minimum data sequence number of all live data
+       // or delete files in the manifest. Will be 0 for v1 manifest lists.
+       MinSequenceNum() int64
+       // KeyMetadata returns implementation-specific key metadata for 
encryption
+       // if it exists in the manifest list.
+       KeyMetadata() []byte
+       // Partitions returns a list of field summaries for each partition
+       // field in the spec. Each field in the list corresponds to a field in
+       // the manifest file's partition spec.
+       Partitions() []FieldSummary
+
+       // HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
+       HasAddedFiles() bool
+       // HasExistingFiles returns true if ExistingDataFiles > 0 or if it was 
null.
+       HasExistingFiles() bool
+       // FetchEntries reads the manifest list file to fetch the list of
+       // manifest entries using the provided file system IO interface.
+       // If discardDeleted is true, entries for files containing deleted rows
+       // will be skipped.
+       FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error)
+}
+
+// ReadManifestList reads in an avro manifest list file and returns a slice
+// of manifest files or an error if one is encountered.
+func ReadManifestList(in io.Reader) ([]ManifestFile, error) {
+       dec, err := ocf.NewDecoder(in)
+       if err != nil {
+               return nil, err
+       }
+
+       out := make([]ManifestFile, 0)
+
+       for dec.HasNext() {
+               var file ManifestFile
+               if string(dec.Metadata()["format-version"]) == "2" {
+                       file = &manifestFileV2{}
+               } else {
+                       file = &manifestFileV1{}
+               }
+
+               if err := dec.Decode(file); err != nil {
+                       return nil, err
+               }
+               out = append(out, file)
+       }
+
+       return out, dec.Error()
+}
+
+// ManifestEntryStatus defines constants for the entry status of
+// existing, added or deleted.
+type ManifestEntryStatus int8
+
+const (
+       EntryStatusEXISTING ManifestEntryStatus = 0
+       EntryStatusADDED    ManifestEntryStatus = 1
+       EntryStatusDELETED  ManifestEntryStatus = 2
+)
+
+// ManifestEntryContent defines constants for the type of file contents
+// in the file entries. Data, Position based deletes and equality based
+// deletes.
+type ManifestEntryContent int8
+
+const (
+       EntryContentData       ManifestEntryContent = 0
+       EntryContentPosDeletes ManifestEntryContent = 1
+       EntryContentEqDeletes  ManifestEntryContent = 2
+)
+
+// FileFormat defines constants for the format of data files.
+type FileFormat string
+
+const (
+       AvroFile    FileFormat = "AVRO"
+       OrcFile     FileFormat = "ORC"
+       ParquetFile FileFormat = "PARQUET"
+)
+
+type colMap[K, V any] struct {
+       Key   K `avro:"key"`
+       Value V `avro:"value"`
+}
+
+func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V {
+       if c == nil {
+               return nil
+       }
+
+       out := make(map[K]V)
+       for _, data := range *c {
+               out[data.Key] = data.Value
+       }
+       return out
+}
+
+type dataFile struct {
+       Content          ManifestEntryContent   `avro:"content"`
+       Path             string                 `avro:"file_path"`
+       Format           FileFormat             `avro:"file_format"`
+       PartitionData    map[string]any         `avro:"partition"`
+       RecordCount      int64                  `avro:"record_count"`
+       FileSize         int64                  `avro:"file_size_in_bytes"`
+       BlockSizeInBytes int64                  `avro:"block_size_in_bytes"`
+       ColSizes         *[]colMap[int, int64]  `avro:"column_sizes"`
+       ValCounts        *[]colMap[int, int64]  `avro:"value_counts"`
+       NullCounts       *[]colMap[int, int64]  `avro:"null_value_counts"`
+       NaNCounts        *[]colMap[int, int64]  `avro:"nan_value_counts"`
+       DistinctCounts   *[]colMap[int, int64]  `avro:"distinct_counts"`
+       LowerBounds      *[]colMap[int, []byte] `avro:"lower_bounds"`
+       UpperBounds      *[]colMap[int, []byte] `avro:"upper_bounds"`
+       Key              *[]byte                `avro:"key_metadata"`
+       Splits           *[]int64               `avro:"split_offsets"`
+       EqualityIDs      *[]int                 `avro:"equality_ids"`
+       SortOrder        *int                   `avro:"sort_order_id"`
+
+       colSizeMap     map[int]int64
+       valCntMap      map[int]int64
+       nullCntMap     map[int]int64
+       nanCntMap      map[int]int64
+       distinctCntMap map[int]int64
+       lowerBoundMap  map[int][]byte
+       upperBoundMap  map[int][]byte
+
+       initMaps sync.Once
+}
+
+func (d *dataFile) initializeMapData() {
+       d.initMaps.Do(func() {
+               d.colSizeMap = avroColMapToMap(d.ColSizes)
+               d.valCntMap = avroColMapToMap(d.ValCounts)
+               d.nullCntMap = avroColMapToMap(d.NullCounts)
+               d.nanCntMap = avroColMapToMap(d.NaNCounts)
+               d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
+               d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
+               d.upperBoundMap = avroColMapToMap(d.UpperBounds)
+       })
+}
+
+func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
+func (d *dataFile) FilePath() string                  { return d.Path }
+func (d *dataFile) FileFormat() FileFormat            { return d.Format }
+func (d *dataFile) Partition() map[string]any         { return d.PartitionData 
}
+func (d *dataFile) Count() int64                      { return d.RecordCount }
+func (d *dataFile) FileSizeBytes() int64              { return d.FileSize }
+
+func (d *dataFile) ColumnSizes() map[int]int64 {
+       d.initializeMapData()
+       return d.colSizeMap
+}
+
+func (d *dataFile) ValueCounts() map[int]int64 {
+       d.initializeMapData()
+       return d.valCntMap
+}
+
+func (d *dataFile) NullValueCounts() map[int]int64 {
+       d.initializeMapData()
+       return d.nullCntMap
+}
+
+func (d *dataFile) NaNValueCounts() map[int]int64 {
+       d.initializeMapData()
+       return d.nanCntMap
+}
+
+func (d *dataFile) DistinctValueCounts() map[int]int64 {
+       d.initializeMapData()
+       return d.distinctCntMap
+}
+
+func (d *dataFile) LowerBoundValues() map[int][]byte {
+       d.initializeMapData()
+       return d.lowerBoundMap
+}
+
+func (d *dataFile) UpperBoundValues() map[int][]byte {
+       d.initializeMapData()
+       return d.upperBoundMap
+}
+
+func (d *dataFile) KeyMetadata() []byte {
+       if d.Key == nil {
+               return nil
+       }
+       return *d.Key
+}
+
+func (d *dataFile) SplitOffsets() []int64 {
+       if d.Splits == nil {
+               return nil
+       }
+       return *d.Splits
+}
+
+func (d *dataFile) EqualityFieldIDs() []int {
+       if d.EqualityIDs == nil {
+               return nil
+       }
+       return d.EqualityFieldIDs()
+}
+
+func (d *dataFile) SortOrderID() *int { return d.SortOrder }
+
+type manifestEntryV1 struct {
+       EntryStatus ManifestEntryStatus `avro:"status"`
+       Snapshot    int64               `avro:"snapshot_id"`
+       SeqNum      *int64
+       FileSeqNum  *int64
+       Data        dataFile `avro:"data_file"`
+}
+
+func (m *manifestEntryV1) inheritSeqNum(manifest ManifestFile) {}
+
+func (m *manifestEntryV1) Status() ManifestEntryStatus { return m.EntryStatus }
+func (m *manifestEntryV1) SnapshotID() int64           { return m.Snapshot }
+
+func (m *manifestEntryV1) SequenceNum() int64 {
+       if m.SeqNum == nil {
+               return 0
+       }
+       return *m.SeqNum
+}
+
+func (m *manifestEntryV1) FileSequenceNum() *int64 {
+       return m.FileSeqNum
+}
+
+func (m *manifestEntryV1) DataFile() DataFile { return &m.Data }
+
+type manifestEntryV2 struct {
+       EntryStatus ManifestEntryStatus `avro:"status"`
+       Snapshot    *int64              `avro:"snapshot_id"`
+       SeqNum      *int64              `avro:"sequence_number"`
+       FileSeqNum  *int64              `avro:"file_sequence_number"`
+       Data        dataFile            `avro:"data_file"`
+}
+
+func (m *manifestEntryV2) inheritSeqNum(manifest ManifestFile) {
+       if m.Snapshot == nil {
+               snap := manifest.SnapshotID()
+               m.Snapshot = &snap
+       }
+
+       manifestSequenceNum := manifest.SequenceNum()
+       if m.SeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == 
EntryStatusADDED) {
+               m.SeqNum = &manifestSequenceNum
+       }
+
+       if m.FileSeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == 
EntryStatusADDED) {
+               m.FileSeqNum = &manifestSequenceNum
+       }
+}
+
+func (m *manifestEntryV2) Status() ManifestEntryStatus { return m.EntryStatus }
+func (m *manifestEntryV2) SnapshotID() int64 {
+       if m.Snapshot == nil {
+               return 0
+       }
+       return *m.Snapshot
+}
+
+func (m *manifestEntryV2) SequenceNum() int64 {
+       if m.SeqNum == nil {
+               return 0
+       }
+       return *m.SeqNum
+}
+
+func (m *manifestEntryV2) FileSequenceNum() *int64 {
+       return m.FileSeqNum
+}
+
+func (m *manifestEntryV2) DataFile() DataFile { return &m.Data }
+
+// DataFile is the interface for reading the information about a
+// given data file indicated by an entry in a manifest list.
+type DataFile interface {
+       // ContentType is the type of the content stored by the data file,
+       // either Data, Equality deletes, or Position deletes. All v1 files
+       // are Data files.
+       ContentType() ManifestEntryContent
+       // FilePath is the full URI for the file, complete with FS scheme.
+       FilePath() string
+       // FileFormat is the format of the data file, AVRO, Orc, or Parquet.
+       FileFormat() FileFormat
+       // Partition returns a mapping of field name to partition value for
+       // each of the partition spec's fields.
+       Partition() map[string]any
+       // Count returns the number of records in this file.
+       Count() int64
+       // FileSizeBytes is the total file size in bytes.
+       FileSizeBytes() int64
+       // ColumnSizes is a mapping from column id to the total size on disk
+       // of all regions that store the column. Does not include bytes
+       // necessary to read other columns, like footers. Map will be nil for
+       // row-oriented formats (avro).
+       ColumnSizes() map[int]int64
+       // ValueCounts is a mapping from column id to the number of values
+       // in the column, including null and NaN values.
+       ValueCounts() map[int]int64
+       // NullValueCounts is a mapping from column id to the number of
+       // null values in the column.
+       NullValueCounts() map[int]int64
+       // NaNValueCounts is a mapping from column id to the number of NaN
+       // values in the column.
+       NaNValueCounts() map[int]int64
+       // DistictValueCounts is a mapping from column id to the number of
+       // distinct values in the column. Distinct counts must be derived
+       // using values in the file by counting or using sketches, but not
+       // using methods like merging existing distinct counts.
+       DistinctValueCounts() map[int]int64
+       // LowerBoundValues is a mapping from column id to the lower bounded
+       // value of the column, serialized as binary. Each value in the column
+       // must be less than or requal to all non-null, non-NaN values in the
+       // column for the file.
+       LowerBoundValues() map[int][]byte
+       // UpperBoundValues is a mapping from column id to the upper bounded
+       // value of the column, serialized as binary. Each value in the column
+       // must be greater than or equal to all non-null, non-NaN values in
+       // the column for the file.
+       UpperBoundValues() map[int][]byte
+       // KeyMetadata is implementation-specific key metadata for encryption.
+       KeyMetadata() []byte
+       // SplitOffsets are the split offsets for the data file. For example,
+       // all row group offsets in a Parquet file. Must be sorted ascending.
+       SplitOffsets() []int64
+       // EqualityFieldIDs are used to determine row equality in equality
+       // delete files. It is required when the content type is
+       // EntryContentEqDeletes.
+       EqualityFieldIDs() []int
+       // SortOrderID returns the id representing the sort order for this
+       // file, or nil if there is no sort order.
+       SortOrderID() *int
+}
+
+// ManifestEntry is an interface for both v1 and v2 manifest entries.
+type ManifestEntry interface {
+       // Status returns the type of the file tracked by this entry.
+       // Deletes are informational only and not used in scans.
+       Status() ManifestEntryStatus
+       // SnapshotID is the id where the file was added, or deleted,
+       // if null it is inherited from the manifest list.
+       SnapshotID() int64
+       // SequenceNum returns the data sequence number of the file.
+       // If it was null and the status is EntryStatusADDED then it
+       // is inherited from the manifest list.
+       SequenceNum() int64
+       // FileSequenceNum returns the file sequence number indicating
+       // when the file was added. If it was null and the status is
+       // EntryStatusADDED then it is inherited from the manifest list.
+       FileSequenceNum() *int64
+       // DataFile provides the information about the data file indicated
+       // by this manifest entry.
+       DataFile() DataFile
+
+       inheritSeqNum(manifest ManifestFile)
+}
+
+var PositionalDeleteSchema = NewSchema(0,
+       NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: 
"file_path", Required: true},
+       NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", 
Required: true},
+)
diff --git a/manifest_test.go b/manifest_test.go
new file mode 100644
index 0000000..74d063e
--- /dev/null
+++ b/manifest_test.go
@@ -0,0 +1,773 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "bytes"
+       "testing"
+       "time"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/hamba/avro/v2/ocf"
+       "github.com/stretchr/testify/suite"
+)
+
+var (
+       falseBool                   = false
+       snapshotID            int64 = 9182715666859759686
+       addedRows             int64 = 237993
+       manifestFileRecordsV1       = []ManifestFile{
+               
NewManifestV1Builder("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
+                       7989, 0).
+                       AddedSnapshotID(snapshotID).
+                       AddedFiles(3).
+                       ExistingFiles(0).
+                       DeletedFiles(0).
+                       AddedRows(addedRows).
+                       ExistingRows(0).
+                       DeletedRows(0).
+                       Partitions([]FieldSummary{{
+                               ContainsNull: true, ContainsNaN: &falseBool,
+                               LowerBound: &[]byte{0x01, 0x00, 0x00, 0x00},
+                               UpperBound: &[]byte{0x02, 0x00, 0x00, 0x00},
+                       }}).Build()}
+
+       manifestFileRecordsV2 = []ManifestFile{
+               
NewManifestV2Builder("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
+                       7989, 0, ManifestContentDeletes, snapshotID).
+                       SequenceNum(3, 3).
+                       AddedFiles(3).
+                       ExistingFiles(0).
+                       DeletedFiles(0).
+                       AddedRows(addedRows).
+                       ExistingRows(0).
+                       DeletedRows(0).
+                       Partitions([]FieldSummary{{
+                               ContainsNull: true,
+                               ContainsNaN:  &falseBool,
+                               LowerBound:   &[]byte{0x01, 0x00, 0x00, 0x00},
+                               UpperBound:   &[]byte{0x02, 0x00, 0x00, 0x00},
+                       }}).Build()}
+
+       entrySnapshotID        int64 = 8744736658442914487
+       intZero                      = 0
+       manifestEntryV1Records       = []*manifestEntryV1{
+               {
+                       EntryStatus: EntryStatusADDED,
+                       Snapshot:    entrySnapshotID,
+                       Data: dataFile{
+                               // bad value for Content but this field doesn't 
exist in V1
+                               // so it shouldn't get written and shouldn't be 
read back out
+                               // so the roundtrip test asserts that we get 
the default value
+                               // back out.
+                               Content:          EntryContentEqDeletes,
+                               Path:             
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
+                               Format:           ParquetFile,
+                               PartitionData:    map[string]any{"VendorID": 
int(1), "tpep_pickup_datetime": time.Unix(1925, 0)},
+                               RecordCount:      19513,
+                               FileSize:         388872,
+                               BlockSizeInBytes: 67108864,
+                               ColSizes: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 53},
+                                       {Key: 2, Value: 98153},
+                                       {Key: 3, Value: 98693},
+                                       {Key: 4, Value: 53},
+                                       {Key: 5, Value: 53},
+                                       {Key: 6, Value: 53},
+                                       {Key: 7, Value: 17425},
+                                       {Key: 8, Value: 18528},
+                                       {Key: 9, Value: 53},
+                                       {Key: 10, Value: 44788},
+                                       {Key: 11, Value: 35571},
+                                       {Key: 12, Value: 53},
+                                       {Key: 13, Value: 1243},
+                                       {Key: 14, Value: 2355},
+                                       {Key: 15, Value: 12750},
+                                       {Key: 16, Value: 4029},
+                                       {Key: 17, Value: 110},
+                                       {Key: 18, Value: 47194},
+                                       {Key: 19, Value: 2948},
+                               },
+                               ValCounts: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 19513},
+                                       {Key: 2, Value: 19513},
+                                       {Key: 3, Value: 19513},
+                                       {Key: 4, Value: 19513},
+                                       {Key: 5, Value: 19513},
+                                       {Key: 6, Value: 19513},
+                                       {Key: 7, Value: 19513},
+                                       {Key: 8, Value: 19513},
+                                       {Key: 9, Value: 19513},
+                                       {Key: 10, Value: 19513},
+                                       {Key: 11, Value: 19513},
+                                       {Key: 12, Value: 19513},
+                                       {Key: 13, Value: 19513},
+                                       {Key: 14, Value: 19513},
+                                       {Key: 15, Value: 19513},
+                                       {Key: 16, Value: 19513},
+                                       {Key: 17, Value: 19513},
+                                       {Key: 18, Value: 19513},
+                                       {Key: 19, Value: 19513},
+                               },
+                               NullCounts: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 19513},
+                                       {Key: 2, Value: 0},
+                                       {Key: 3, Value: 0},
+                                       {Key: 4, Value: 19513},
+                                       {Key: 5, Value: 19513},
+                                       {Key: 6, Value: 19513},
+                                       {Key: 7, Value: 0},
+                                       {Key: 8, Value: 0},
+                                       {Key: 9, Value: 19513},
+                                       {Key: 10, Value: 0},
+                                       {Key: 11, Value: 0},
+                                       {Key: 12, Value: 19513},
+                                       {Key: 13, Value: 0},
+                                       {Key: 14, Value: 0},
+                                       {Key: 15, Value: 0},
+                                       {Key: 16, Value: 0},
+                                       {Key: 17, Value: 0},
+                                       {Key: 18, Value: 0},
+                                       {Key: 19, Value: 0},
+                               },
+                               NaNCounts: &[]colMap[int, int64]{
+                                       {Key: 16, Value: 0},
+                                       {Key: 17, Value: 0},
+                                       {Key: 18, Value: 0},
+                                       {Key: 19, Value: 0},
+                                       {Key: 10, Value: 0},
+                                       {Key: 11, Value: 0},
+                                       {Key: 12, Value: 0},
+                                       {Key: 13, Value: 0},
+                                       {Key: 14, Value: 0},
+                                       {Key: 15, Value: 0},
+                               },
+                               LowerBounds: &[]colMap[int, []byte]{
+                                       {Key: 2, Value: []byte("2020-04-01 
00:00")},
+                                       {Key: 3, Value: []byte("2020-04-01 
00:12")},
+                                       {Key: 7, Value: []byte{0x03, 0x00, 
0x00, 0x00}},
+                                       {Key: 8, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 10, Value: []byte{0xf6, 0x28, 
0x5c, 0x8f, 0xc2, 0x05, 'S', 0xc0}},
+                                       {Key: 11, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 13, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 14, Value: []byte{0, 0, 0, 0, 0, 
0, 0xe0, 0xbf}},
+                                       {Key: 15, Value: []byte{')', '\\', 
0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0}},
+                                       {Key: 16, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 17, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 18, Value: []byte{0xf6, '(', 
'\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0}},
+                                       {Key: 19, Value: []byte{0, 0, 0, 0, 0, 
0, 0x04, 0xc0}},
+                               },
+                               UpperBounds: &[]colMap[int, []byte]{
+                                       {Key: 2, Value: []byte("2020-04-30 
23:5:")},
+                                       {Key: 3, Value: []byte("2020-05-01 
00:41")},
+                                       {Key: 7, Value: []byte{'\t', 0x01, 
0x00, 0x00}},
+                                       {Key: 8, Value: []byte{'\t', 0x01, 
0x00, 0x00}},
+                                       {Key: 10, Value: []byte{0xcd, 0xcc, 
0xcc, 0xcc, 0xcc, ',', '_', '@'}},
+                                       {Key: 11, Value: []byte{0x1f, 0x85, 
0xeb, 'Q', '\\', 0xe2, 0xfe, '@'}},
+                                       {Key: 13, Value: []byte{0, 0, 0, 0, 0, 
0, 0x12, '@'}},
+                                       {Key: 14, Value: []byte{0, 0, 0, 0, 0, 
0, 0xe0, '?'}},
+                                       {Key: 15, Value: []byte{'q', '=', '\n', 
0xd7, 0xa3, 0xf0, '1', '@'}},
+                                       {Key: 16, Value: []byte{0, 0, 0, 0, 0, 
'`', 'B', '@'}},
+                                       {Key: 17, Value: []byte{'3', '3', '3', 
'3', '3', '3', 0xd3, '?'}},
+                                       {Key: 18, Value: []byte{0, 0, 0, 0, 0, 
0x18, 'b', '@'}},
+                                       {Key: 19, Value: []byte{0, 0, 0, 0, 0, 
0, 0x04, '@'}},
+                               },
+                               Splits:    &[]int64{4},
+                               SortOrder: &intZero,
+                       },
+               },
+               {
+                       EntryStatus: EntryStatusADDED,
+                       Snapshot:    8744736658442914487,
+                       Data: dataFile{
+                               Path:             
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
+                               Format:           ParquetFile,
+                               PartitionData:    map[string]any{"VendorID": 
int(1), "tpep_pickup_datetime": time.Unix(1925, 0)},
+                               RecordCount:      95050,
+                               FileSize:         1265950,
+                               BlockSizeInBytes: 67108864,
+                               ColSizes: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 318},
+                                       {Key: 2, Value: 329806},
+                                       {Key: 3, Value: 331632},
+                                       {Key: 4, Value: 15343},
+                                       {Key: 5, Value: 2351},
+                                       {Key: 6, Value: 3389},
+                                       {Key: 7, Value: 71269},
+                                       {Key: 8, Value: 76429},
+                                       {Key: 9, Value: 16383},
+                                       {Key: 10, Value: 86992},
+                                       {Key: 11, Value: 89608},
+                                       {Key: 12, Value: 265},
+                                       {Key: 13, Value: 19377},
+                                       {Key: 14, Value: 1692},
+                                       {Key: 15, Value: 76162},
+                                       {Key: 16, Value: 4354},
+                                       {Key: 17, Value: 759},
+                                       {Key: 18, Value: 120650},
+                                       {Key: 19, Value: 11804},
+                               },
+                               ValCounts: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 95050},
+                                       {Key: 2, Value: 95050},
+                                       {Key: 3, Value: 95050},
+                                       {Key: 4, Value: 95050},
+                                       {Key: 5, Value: 95050},
+                                       {Key: 6, Value: 95050},
+                                       {Key: 7, Value: 95050},
+                                       {Key: 8, Value: 95050},
+                                       {Key: 9, Value: 95050},
+                                       {Key: 10, Value: 95050},
+                                       {Key: 11, Value: 95050},
+                                       {Key: 12, Value: 95050},
+                                       {Key: 13, Value: 95050},
+                                       {Key: 14, Value: 95050},
+                                       {Key: 15, Value: 95050},
+                                       {Key: 16, Value: 95050},
+                                       {Key: 17, Value: 95050},
+                                       {Key: 18, Value: 95050},
+                                       {Key: 19, Value: 95050},
+                               },
+                               NullCounts: &[]colMap[int, int64]{
+                                       {Key: 1, Value: 0},
+                                       {Key: 2, Value: 0},
+                                       {Key: 3, Value: 0},
+                                       {Key: 4, Value: 0},
+                                       {Key: 5, Value: 0},
+                                       {Key: 6, Value: 0},
+                                       {Key: 7, Value: 0},
+                                       {Key: 8, Value: 0},
+                                       {Key: 9, Value: 0},
+                                       {Key: 10, Value: 0},
+                                       {Key: 11, Value: 0},
+                                       {Key: 12, Value: 95050},
+                                       {Key: 13, Value: 0},
+                                       {Key: 14, Value: 0},
+                                       {Key: 15, Value: 0},
+                                       {Key: 16, Value: 0},
+                                       {Key: 17, Value: 0},
+                                       {Key: 18, Value: 0},
+                                       {Key: 19, Value: 0},
+                               },
+                               NaNCounts: &[]colMap[int, int64]{
+                                       {Key: 16, Value: 0},
+                                       {Key: 17, Value: 0},
+                                       {Key: 18, Value: 0},
+                                       {Key: 19, Value: 0},
+                                       {Key: 10, Value: 0},
+                                       {Key: 11, Value: 0},
+                                       {Key: 12, Value: 0},
+                                       {Key: 13, Value: 0},
+                                       {Key: 14, Value: 0},
+                                       {Key: 15, Value: 0},
+                               },
+                               LowerBounds: &[]colMap[int, []byte]{
+                                       {Key: 1, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 2, Value: []byte("2020-04-01 
00:00")},
+                                       {Key: 3, Value: []byte("2020-04-01 
00:13")},
+                                       {Key: 4, Value: []byte{0x00, 0x00, 
0x00, 0x00}},
+                                       {Key: 5, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 6, Value: []byte("N")},
+                                       {Key: 7, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 8, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 9, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 10, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 11, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 13, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 14, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 15, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 16, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 17, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 18, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                                       {Key: 19, Value: []byte{0, 0, 0, 0, 0, 
0, 0, 0}},
+                               },
+                               UpperBounds: &[]colMap[int, []byte]{
+                                       {Key: 1, Value: []byte{0x01, 0x00, 
0x00, 0x00}},
+                                       {Key: 2, Value: []byte("2020-04-30 
23:5:")},
+                                       {Key: 3, Value: []byte("2020-05-01 
00:1:")},
+                                       {Key: 4, Value: []byte{0x06, 0x00, 
0x00, 0x00}},
+                                       {Key: 5, Value: []byte{'c', 0x00, 0x00, 
0x00}},
+                                       {Key: 6, Value: []byte("Y")},
+                                       {Key: 7, Value: []byte{'\t', 0x01, 
0x00, 0x00}},
+                                       {Key: 8, Value: []byte{'\t', 0x01, 
0x00, 0x00}},
+                                       {Key: 9, Value: []byte{0x04, 0x01, 
0x00, 0x00}},
+                                       {Key: 10, Value: []byte{'\\', 0x8f, 
0xc2, 0xf5, '(', '8', 0x8c, '@'}},
+                                       {Key: 11, Value: []byte{0xcd, 0xcc, 
0xcc, 0xcc, 0xcc, ',', 'f', '@'}},
+                                       {Key: 13, Value: []byte{0, 0, 0, 0, 0, 
0, 0x1c, '@'}},
+                                       {Key: 14, Value: []byte{0x9a, 0x99, 
0x99, 0x99, 0x99, 0x99, 0xf1, '?'}},
+                                       {Key: 15, Value: []byte{0, 0, 0, 0, 0, 
0, 'Y', '@'}},
+                                       {Key: 16, Value: []byte{0, 0, 0, 0, 0, 
0xb0, 'X', '@'}},
+                                       {Key: 17, Value: []byte{'3', '3', '3', 
'3', '3', '3', 0xd3, '?'}},
+                                       {Key: 18, Value: []byte{0xc3, 0xf5, 
'(', '\\', 0x8f, ':', 0x8c, '@'}},
+                                       {Key: 19, Value: []byte{0, 0, 0, 0, 0, 
0, 0x04, '@'}},
+                               },
+                               Splits:    &[]int64{4},
+                               SortOrder: &intZero,
+                       },
+               },
+       }
+
+       manifestEntryV2Records = []*manifestEntryV2{
+               {
+                       EntryStatus: EntryStatusADDED,
+                       Snapshot:    &entrySnapshotID,
+                       Data: dataFile{
+                               Path:             
manifestEntryV1Records[0].Data.Path,
+                               Format:           
manifestEntryV1Records[0].Data.Format,
+                               PartitionData:    
manifestEntryV1Records[0].Data.PartitionData,
+                               RecordCount:      
manifestEntryV1Records[0].Data.RecordCount,
+                               FileSize:         
manifestEntryV1Records[0].Data.FileSize,
+                               BlockSizeInBytes: 
manifestEntryV1Records[0].Data.BlockSizeInBytes,
+                               ColSizes:         
manifestEntryV1Records[0].Data.ColSizes,
+                               ValCounts:        
manifestEntryV1Records[0].Data.ValCounts,
+                               NullCounts:       
manifestEntryV1Records[0].Data.NullCounts,
+                               NaNCounts:        
manifestEntryV1Records[0].Data.NaNCounts,
+                               LowerBounds:      
manifestEntryV1Records[0].Data.LowerBounds,
+                               UpperBounds:      
manifestEntryV1Records[0].Data.UpperBounds,
+                               Splits:           
manifestEntryV1Records[0].Data.Splits,
+                               SortOrder:        
manifestEntryV1Records[0].Data.SortOrder,
+                       },
+               },
+               {
+                       EntryStatus: EntryStatusADDED,
+                       Snapshot:    &entrySnapshotID,
+                       Data: dataFile{
+                               Path:             
manifestEntryV1Records[1].Data.Path,
+                               Format:           
manifestEntryV1Records[1].Data.Format,
+                               PartitionData:    
manifestEntryV1Records[1].Data.PartitionData,
+                               RecordCount:      
manifestEntryV1Records[1].Data.RecordCount,
+                               FileSize:         
manifestEntryV1Records[1].Data.FileSize,
+                               BlockSizeInBytes: 
manifestEntryV1Records[1].Data.BlockSizeInBytes,
+                               ColSizes:         
manifestEntryV1Records[1].Data.ColSizes,
+                               ValCounts:        
manifestEntryV1Records[1].Data.ValCounts,
+                               NullCounts:       
manifestEntryV1Records[1].Data.NullCounts,
+                               NaNCounts:        
manifestEntryV1Records[1].Data.NaNCounts,
+                               LowerBounds:      
manifestEntryV1Records[1].Data.LowerBounds,
+                               UpperBounds:      
manifestEntryV1Records[1].Data.UpperBounds,
+                               Splits:           
manifestEntryV1Records[1].Data.Splits,
+                               SortOrder:        
manifestEntryV1Records[1].Data.SortOrder,
+                       },
+               },
+       }
+)
+
+type ManifestTestSuite struct {
+       suite.Suite
+
+       v1ManifestList    bytes.Buffer
+       v1ManifestEntries bytes.Buffer
+
+       v2ManifestList    bytes.Buffer
+       v2ManifestEntries bytes.Buffer
+}
+
+func (m *ManifestTestSuite) writeManifestList() {
+       enc, err := 
ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestListV1Key).String(),
+               &m.v1ManifestList, ocf.WithMetadata(map[string][]byte{
+                       "avro.codec": []byte("deflate"),
+               }),
+               ocf.WithCodec(ocf.Deflate))
+       m.Require().NoError(err)
+
+       m.Require().NoError(enc.Encode(manifestFileRecordsV1[0]))
+       enc.Close()
+
+       enc, err = 
ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestListV2Key).String(),
+               &m.v2ManifestList, ocf.WithMetadata(map[string][]byte{
+                       "format-version": []byte("2"),
+                       "avro.codec":     []byte("deflate"),
+               }), ocf.WithCodec(ocf.Deflate))
+       m.Require().NoError(err)
+
+       m.Require().NoError(enc.Encode(manifestFileRecordsV2[0]))
+       enc.Close()
+}
+
+func (m *ManifestTestSuite) writeManifestEntries() {
+       enc, err := 
ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestEntryV1Key).String(),
 &m.v1ManifestEntries,
+               ocf.WithMetadata(map[string][]byte{
+                       "format-version": []byte("1"),
+               }), ocf.WithCodec(ocf.Deflate))
+       m.Require().NoError(err)
+
+       for _, ent := range manifestEntryV1Records {
+               m.Require().NoError(enc.Encode(ent))
+       }
+       m.Require().NoError(enc.Close())
+
+       enc, err = 
ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestEntryV2Key).String(),
+               &m.v2ManifestEntries, ocf.WithMetadata(map[string][]byte{
+                       "format-version": []byte("2"),
+                       "avro.codec":     []byte("deflate"),
+               }), ocf.WithCodec(ocf.Deflate))
+       m.Require().NoError(err)
+
+       for _, ent := range manifestEntryV2Records {
+               m.Require().NoError(enc.Encode(ent))
+       }
+       m.Require().NoError(enc.Close())
+}
+
+func (m *ManifestTestSuite) SetupSuite() {
+       m.writeManifestList()
+       m.writeManifestEntries()
+}
+
+func (m *ManifestTestSuite) TestManifestEntriesV1() {
+       var mockfs internal.MockFS
+       manifest := manifestFileV1{
+               Path: manifestFileRecordsV1[0].FilePath(),
+       }
+
+       mockfs.Test(m.T())
+       mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{
+               Contents: bytes.NewReader(m.v1ManifestEntries.Bytes())}, nil)
+       defer mockfs.AssertExpectations(m.T())
+       entries, err := manifest.FetchEntries(&mockfs, false)
+       m.Require().NoError(err)
+       m.Len(entries, 2)
+       m.Zero(manifest.PartitionSpecID())
+       m.Zero(manifest.SnapshotID())
+       m.Zero(manifest.AddedDataFiles())
+       m.Zero(manifest.ExistingDataFiles())
+       m.Zero(manifest.DeletedDataFiles())
+       m.Zero(manifest.ExistingRows())
+       m.Zero(manifest.DeletedRows())
+       m.Zero(manifest.AddedRows())
+
+       entry1 := entries[0]
+
+       m.Equal(EntryStatusADDED, entry1.Status())
+       m.EqualValues(8744736658442914487, entry1.SnapshotID())
+       m.Zero(entry1.SequenceNum())
+       m.Nil(entry1.FileSequenceNum())
+
+       datafile := entry1.DataFile()
+       m.Equal(EntryContentData, datafile.ContentType())
+       
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
 datafile.FilePath())
+       m.Equal(ParquetFile, datafile.FileFormat())
+       m.EqualValues(19513, datafile.Count())
+       m.EqualValues(388872, datafile.FileSizeBytes())
+       m.Equal(map[int]int64{
+               1:  53,
+               2:  98153,
+               3:  98693,
+               4:  53,
+               5:  53,
+               6:  53,
+               7:  17425,
+               8:  18528,
+               9:  53,
+               10: 44788,
+               11: 35571,
+               12: 53,
+               13: 1243,
+               14: 2355,
+               15: 12750,
+               16: 4029,
+               17: 110,
+               18: 47194,
+               19: 2948,
+       }, datafile.ColumnSizes())
+       m.Equal(map[int]int64{
+               1:  19513,
+               2:  19513,
+               3:  19513,
+               4:  19513,
+               5:  19513,
+               6:  19513,
+               7:  19513,
+               8:  19513,
+               9:  19513,
+               10: 19513,
+               11: 19513,
+               12: 19513,
+               13: 19513,
+               14: 19513,
+               15: 19513,
+               16: 19513,
+               17: 19513,
+               18: 19513,
+               19: 19513,
+       }, datafile.ValueCounts())
+       m.Equal(map[int]int64{
+               1:  19513,
+               2:  0,
+               3:  0,
+               4:  19513,
+               5:  19513,
+               6:  19513,
+               7:  0,
+               8:  0,
+               9:  19513,
+               10: 0,
+               11: 0,
+               12: 19513,
+               13: 0,
+               14: 0,
+               15: 0,
+               16: 0,
+               17: 0,
+               18: 0,
+               19: 0,
+       }, datafile.NullValueCounts())
+       m.Equal(map[int]int64{
+               16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 
15: 0,
+       }, datafile.NaNValueCounts())
+
+       m.Equal(map[int][]byte{
+               2:  []byte("2020-04-01 00:00"),
+               3:  []byte("2020-04-01 00:12"),
+               7:  {0x03, 0x00, 0x00, 0x00},
+               8:  {0x01, 0x00, 0x00, 0x00},
+               10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0},
+               11: {0, 0, 0, 0, 0, 0, 0, 0},
+               13: {0, 0, 0, 0, 0, 0, 0, 0},
+               14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf},
+               15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0},
+               16: {0, 0, 0, 0, 0, 0, 0, 0},
+               17: {0, 0, 0, 0, 0, 0, 0, 0},
+               18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0},
+               19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0},
+       }, datafile.LowerBoundValues())
+
+       m.Equal(map[int][]byte{
+               2:  []byte("2020-04-30 23:5:"),
+               3:  []byte("2020-05-01 00:41"),
+               7:  {'\t', 0x01, 0, 0},
+               8:  {'\t', 0x01, 0, 0},
+               10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'},
+               11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'},
+               13: {0, 0, 0, 0, 0, 0, 0x12, '@'},
+               14: {0, 0, 0, 0, 0, 0, 0xe0, '?'},
+               15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'},
+               16: {0, 0, 0, 0, 0, '`', 'B', '@'},
+               17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'},
+               18: {0, 0, 0, 0, 0, 0x18, 'b', '@'},
+               19: {0, 0, 0, 0, 0, 0, 0x04, '@'},
+       }, datafile.UpperBoundValues())
+
+       m.Nil(datafile.KeyMetadata())
+       m.Equal([]int64{4}, datafile.SplitOffsets())
+       m.Nil(datafile.EqualityFieldIDs())
+       m.Zero(*datafile.SortOrderID())
+}
+
+func (m *ManifestTestSuite) TestReadManifestListV1() {
+       list, err := ReadManifestList(&m.v1ManifestList)
+       m.Require().NoError(err)
+
+       m.Len(list, 1)
+       m.Equal(1, list[0].Version())
+       m.EqualValues(7989, list[0].Length())
+       m.Equal(ManifestContentData, list[0].ManifestContent())
+       m.Zero(list[0].SequenceNum())
+       m.Zero(list[0].MinSequenceNum())
+       m.EqualValues(9182715666859759686, list[0].SnapshotID())
+       m.EqualValues(3, list[0].AddedDataFiles())
+       m.True(list[0].HasAddedFiles())
+       m.Zero(list[0].ExistingDataFiles())
+       m.False(list[0].HasExistingFiles())
+       m.Zero(list[0].DeletedDataFiles())
+       m.Equal(addedRows, list[0].AddedRows())
+       m.Zero(list[0].ExistingRows())
+       m.Zero(list[0].DeletedRows())
+       m.Nil(list[0].KeyMetadata())
+       m.Zero(list[0].PartitionSpecID())
+       m.Equal(snapshotID, list[0].SnapshotID())
+
+       part := list[0].Partitions()[0]
+       m.True(part.ContainsNull)
+       m.False(*part.ContainsNaN)
+       m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
+       m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
+}
+
+func (m *ManifestTestSuite) TestReadManifestListV2() {
+       list, err := ReadManifestList(&m.v2ManifestList)
+       m.Require().NoError(err)
+
+       
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
 list[0].FilePath())
+       m.Len(list, 1)
+       m.Equal(2, list[0].Version())
+       m.EqualValues(7989, list[0].Length())
+       m.Equal(ManifestContentDeletes, list[0].ManifestContent())
+       m.EqualValues(3, list[0].SequenceNum())
+       m.EqualValues(3, list[0].MinSequenceNum())
+       m.EqualValues(9182715666859759686, list[0].SnapshotID())
+       m.EqualValues(3, list[0].AddedDataFiles())
+       m.True(list[0].HasAddedFiles())
+       m.Zero(list[0].ExistingDataFiles())
+       m.False(list[0].HasExistingFiles())
+       m.Zero(list[0].DeletedDataFiles())
+       m.Equal(addedRows, list[0].AddedRows())
+       m.Zero(list[0].ExistingRows())
+       m.Zero(list[0].DeletedRows())
+       m.Nil(list[0].KeyMetadata())
+       m.Zero(list[0].PartitionSpecID())
+
+       part := list[0].Partitions()[0]
+       m.True(part.ContainsNull)
+       m.False(*part.ContainsNaN)
+       m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
+       m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
+}
+
+func (m *ManifestTestSuite) TestManifestEntriesV2() {
+       var mockfs internal.MockFS
+       manifest := manifestFileV2{
+               Path: manifestFileRecordsV2[0].FilePath(),
+       }
+
+       mockfs.Test(m.T())
+       mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{
+               Contents: bytes.NewReader(m.v2ManifestEntries.Bytes())}, nil)
+       defer mockfs.AssertExpectations(m.T())
+       entries, err := manifest.FetchEntries(&mockfs, false)
+       m.Require().NoError(err)
+       m.Len(entries, 2)
+       m.Zero(manifest.PartitionSpecID())
+       m.Zero(manifest.SnapshotID())
+       m.Zero(manifest.AddedDataFiles())
+       m.Zero(manifest.ExistingDataFiles())
+       m.Zero(manifest.DeletedDataFiles())
+       m.Zero(manifest.ExistingRows())
+       m.Zero(manifest.DeletedRows())
+       m.Zero(manifest.AddedRows())
+
+       entry1 := entries[0]
+
+       m.Equal(EntryStatusADDED, entry1.Status())
+       m.Equal(entrySnapshotID, entry1.SnapshotID())
+       m.Zero(entry1.SequenceNum())
+       m.Zero(*entry1.FileSequenceNum())
+
+       datafile := entry1.DataFile()
+       m.Equal(EntryContentData, datafile.ContentType())
+       
m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
 datafile.FilePath())
+       m.Equal(ParquetFile, datafile.FileFormat())
+       m.EqualValues(19513, datafile.Count())
+       m.EqualValues(388872, datafile.FileSizeBytes())
+       m.Equal(map[int]int64{
+               1:  53,
+               2:  98153,
+               3:  98693,
+               4:  53,
+               5:  53,
+               6:  53,
+               7:  17425,
+               8:  18528,
+               9:  53,
+               10: 44788,
+               11: 35571,
+               12: 53,
+               13: 1243,
+               14: 2355,
+               15: 12750,
+               16: 4029,
+               17: 110,
+               18: 47194,
+               19: 2948,
+       }, datafile.ColumnSizes())
+       m.Equal(map[int]int64{
+               1:  19513,
+               2:  19513,
+               3:  19513,
+               4:  19513,
+               5:  19513,
+               6:  19513,
+               7:  19513,
+               8:  19513,
+               9:  19513,
+               10: 19513,
+               11: 19513,
+               12: 19513,
+               13: 19513,
+               14: 19513,
+               15: 19513,
+               16: 19513,
+               17: 19513,
+               18: 19513,
+               19: 19513,
+       }, datafile.ValueCounts())
+       m.Equal(map[int]int64{
+               1:  19513,
+               2:  0,
+               3:  0,
+               4:  19513,
+               5:  19513,
+               6:  19513,
+               7:  0,
+               8:  0,
+               9:  19513,
+               10: 0,
+               11: 0,
+               12: 19513,
+               13: 0,
+               14: 0,
+               15: 0,
+               16: 0,
+               17: 0,
+               18: 0,
+               19: 0,
+       }, datafile.NullValueCounts())
+       m.Equal(map[int]int64{
+               16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 
15: 0,
+       }, datafile.NaNValueCounts())
+
+       m.Equal(map[int][]byte{
+               2:  []byte("2020-04-01 00:00"),
+               3:  []byte("2020-04-01 00:12"),
+               7:  {0x03, 0x00, 0x00, 0x00},
+               8:  {0x01, 0x00, 0x00, 0x00},
+               10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0},
+               11: {0, 0, 0, 0, 0, 0, 0, 0},
+               13: {0, 0, 0, 0, 0, 0, 0, 0},
+               14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf},
+               15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0},
+               16: {0, 0, 0, 0, 0, 0, 0, 0},
+               17: {0, 0, 0, 0, 0, 0, 0, 0},
+               18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0},
+               19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0},
+       }, datafile.LowerBoundValues())
+
+       m.Equal(map[int][]byte{
+               2:  []byte("2020-04-30 23:5:"),
+               3:  []byte("2020-05-01 00:41"),
+               7:  {'\t', 0x01, 0, 0},
+               8:  {'\t', 0x01, 0, 0},
+               10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'},
+               11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'},
+               13: {0, 0, 0, 0, 0, 0, 0x12, '@'},
+               14: {0, 0, 0, 0, 0, 0, 0xe0, '?'},
+               15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'},
+               16: {0, 0, 0, 0, 0, '`', 'B', '@'},
+               17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'},
+               18: {0, 0, 0, 0, 0, 0x18, 'b', '@'},
+               19: {0, 0, 0, 0, 0, 0, 0x04, '@'},
+       }, datafile.UpperBoundValues())
+
+       m.Nil(datafile.KeyMetadata())
+       m.Equal([]int64{4}, datafile.SplitOffsets())
+       m.Nil(datafile.EqualityFieldIDs())
+       m.Zero(*datafile.SortOrderID())
+}
+
+func TestManifests(t *testing.T) {
+       suite.Run(t, new(ManifestTestSuite))
+}

Reply via email to