This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 85896524 fix(table): support gzipped metadata json (#610)
85896524 is described below
commit 85896524f4fcc6d539e05fa96482e5d8f7eda351
Author: Alex <[email protected]>
AuthorDate: Fri Oct 24 14:15:09 2025 -0600
fix(table): support gzipped metadata json (#610)
This PR adds support for gzipped metadata json files.
- iceberg spec reference:
https://iceberg.apache.org/spec/#naming-for-gzip-compressed-metadata-json-files
- pyiceberg implementation:
https://github.com/apache/iceberg-python/blob/7d5c58d5b2b2ef914cf0cf8577a97b01221afe3a/pyiceberg/serializers.py#L35
While the spec (linked above) implies `.gz.metadata.json` is the
standard, it mentions that the java implementation also supports reading
`metadata.json.gz`. As written, this PR supports both, but I will defer
to your guidance. FWIW, pyiceberg only supports `.gz.metadata.json`.
---
table/table.go | 66 ++++++++++++++++++++++++++++++++++++++---------------
table/table_test.go | 35 ++++++++++++++++++++++++++++
2 files changed, 83 insertions(+), 18 deletions(-)
diff --git a/table/table.go b/table/table.go
index bd2ab8cb..8abf6fb8 100644
--- a/table/table.go
+++ b/table/table.go
@@ -18,22 +18,26 @@
package table
import (
+ "bytes"
+ "compress/gzip"
"context"
+ "io"
"iter"
"log"
"runtime"
"slices"
+ "strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
- "github.com/apache/iceberg-go/io"
+ icebergio "github.com/apache/iceberg-go/io"
tblutils "github.com/apache/iceberg-go/table/internal"
"golang.org/x/sync/errgroup"
)
-type FSysF func(ctx context.Context) (io.IO, error)
+type FSysF func(ctx context.Context) (icebergio.IO, error)
type Identifier = []string
@@ -56,19 +60,19 @@ func (t Table) Equals(other Table) bool {
t.metadata.Equals(other.metadata)
}
-func (t Table) Identifier() Identifier { return t.identifier }
-func (t Table) Metadata() Metadata { return t.metadata }
-func (t Table) MetadataLocation() string { return
t.metadataLocation }
-func (t Table) FS(ctx context.Context) (io.IO, error) { return t.fsF(ctx) }
-func (t Table) Schema() *iceberg.Schema { return
t.metadata.CurrentSchema() }
-func (t Table) Spec() iceberg.PartitionSpec { return
t.metadata.PartitionSpec() }
-func (t Table) SortOrder() SortOrder { return
t.metadata.SortOrder() }
-func (t Table) Properties() iceberg.Properties { return
t.metadata.Properties() }
-func (t Table) NameMapping() iceberg.NameMapping { return
t.metadata.NameMapping() }
-func (t Table) Location() string { return
t.metadata.Location() }
-func (t Table) CurrentSnapshot() *Snapshot { return
t.metadata.CurrentSnapshot() }
-func (t Table) SnapshotByID(id int64) *Snapshot { return
t.metadata.SnapshotByID(id) }
-func (t Table) SnapshotByName(name string) *Snapshot { return
t.metadata.SnapshotByName(name) }
+func (t Table) Identifier() Identifier { return
t.identifier }
+func (t Table) Metadata() Metadata { return
t.metadata }
+func (t Table) MetadataLocation() string { return
t.metadataLocation }
+func (t Table) FS(ctx context.Context) (icebergio.IO, error) { return
t.fsF(ctx) }
+func (t Table) Schema() *iceberg.Schema { return
t.metadata.CurrentSchema() }
+func (t Table) Spec() iceberg.PartitionSpec { return
t.metadata.PartitionSpec() }
+func (t Table) SortOrder() SortOrder { return
t.metadata.SortOrder() }
+func (t Table) Properties() iceberg.Properties { return
t.metadata.Properties() }
+func (t Table) NameMapping() iceberg.NameMapping { return
t.metadata.NameMapping() }
+func (t Table) Location() string { return
t.metadata.Location() }
+func (t Table) CurrentSnapshot() *Snapshot { return
t.metadata.CurrentSnapshot() }
+func (t Table) SnapshotByID(id int64) *Snapshot { return
t.metadata.SnapshotByID(id) }
+func (t Table) SnapshotByName(name string) *Snapshot { return
t.metadata.SnapshotByName(name) }
func (t Table) Schemas() map[int]*iceberg.Schema {
m := make(map[int]*iceberg.Schema)
for _, s := range t.metadata.Schemas() {
@@ -256,7 +260,7 @@ func getFiles(it iter.Seq[MetadataLogEntry])
iter.Seq[string] {
}
}
-func deleteOldMetadata(fs io.IO, baseMeta, newMeta Metadata) {
+func deleteOldMetadata(fs icebergio.IO, baseMeta, newMeta Metadata) {
deleteAfterCommit :=
newMeta.Properties().GetBool(MetadataDeleteAfterCommitEnabledKey,
MetadataDeleteAfterCommitEnabledDefault)
@@ -397,12 +401,24 @@ func NewFromLocation(
if err != nil {
return nil, err
}
- if rf, ok := fsys.(io.ReadFileIO); ok {
+ if rf, ok := fsys.(icebergio.ReadFileIO); ok {
data, err := rf.ReadFile(metalocation)
if err != nil {
return nil, err
}
+ if isGzippedMetadataJson(metalocation) {
+ gz, err := gzip.NewReader(bytes.NewReader(data))
+ if err != nil {
+ return nil, err
+ }
+ defer gz.Close()
+ data, err = io.ReadAll(gz)
+ if err != nil {
+ return nil, err
+ }
+ }
+
if meta, err = ParseMetadataBytes(data); err != nil {
return nil, err
}
@@ -413,10 +429,24 @@ func NewFromLocation(
}
defer internal.CheckedClose(f, &err)
- if meta, err = ParseMetadata(f); err != nil {
+ var r io.Reader = f
+ if isGzippedMetadataJson(metalocation) {
+ gz, err := gzip.NewReader(f)
+ if err != nil {
+ return nil, err
+ }
+ defer gz.Close()
+ r = gz
+ }
+
+ if meta, err = ParseMetadata(r); err != nil {
return nil, err
}
}
return New(ident, meta, metalocation, fsysF, cat), nil
}
+
+func isGzippedMetadataJson(location string) bool {
+ return strings.HasSuffix(location, ".gz.metadata.json") ||
strings.HasSuffix(location, "metadata.json.gz")
+}
diff --git a/table/table_test.go b/table/table_test.go
index 7f7abc02..4e8afbd7 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -19,6 +19,7 @@ package table_test
import (
"bytes"
+ "compress/gzip"
"context"
"fmt"
"io/fs"
@@ -115,6 +116,40 @@ func (t *TableTestSuite) TestNewTableFromReadFile() {
t.True(t.tbl.Equals(*tbl2))
}
+func (t *TableTestSuite) TestNewTableFromReadFileGzipped() {
+ var b bytes.Buffer
+ gzWriter := gzip.NewWriter(&b)
+
+ _, err := gzWriter.Write([]byte(table.ExampleTableMetadataV2))
+ if err != nil {
+ log.Fatalf("Error writing to gzip writer: %v", err)
+ }
+ err = gzWriter.Close()
+ if err != nil {
+ log.Fatalf("Error closing gzip writer: %v", err)
+ }
+
+ var mockfsReadFile internal.MockFSReadFile
+ mockfsReadFile.Test(t.T())
+ mockfsReadFile.On("ReadFile",
"s3://bucket/test/location/uuid.gz.metadata.json").
+ Return(b.Bytes(), nil)
+ defer mockfsReadFile.AssertExpectations(t.T())
+
+ tbl2, err := table.NewFromLocation(
+ t.T().Context(),
+ []string{"foo"},
+ "s3://bucket/test/location/uuid.gz.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) {
+ return &mockfsReadFile, nil
+ },
+ nil,
+ )
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
func (t *TableTestSuite) TestSchema() {
t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1,
[]int{1, 2},
iceberg.NestedField{ID: 1, Name: "x", Type:
iceberg.PrimitiveTypes.Int64, Required: true},