This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new cfd2c3b fix(catalog): add rest integration write test (#352) cfd2c3b is described below commit cfd2c3ba2b61106bbbfdd1c0d045cc467c42c4e0 Author: Matt Topol <zotthewiz...@gmail.com> AuthorDate: Thu Mar 20 13:45:22 2025 -0400 fix(catalog): add rest integration write test (#352) Includes a few fixes that were found when writing the test --- catalog/rest/options.go | 9 +++++ catalog/rest/rest.go | 16 +++++++- catalog/rest/rest_integration_test.go | 74 ++++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 3 deletions(-) diff --git a/catalog/rest/options.go b/catalog/rest/options.go index 5aae122..ae1782c 100644 --- a/catalog/rest/options.go +++ b/catalog/rest/options.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "net/url" + "github.com/apache/iceberg-go" "github.com/aws/aws-sdk-go-v2/aws" ) @@ -100,6 +101,12 @@ func WithScope(scope string) Option { } } +func WithAdditionalProps(props iceberg.Properties) Option { + return func(o *options) { + o.additionalProps = props + } +} + type options struct { awsConfig aws.Config tlsConfig *tls.Config @@ -113,4 +120,6 @@ type options struct { prefix string authUri *url.URL scope string + + additionalProps iceberg.Properties } diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go index cbeed0f..5eb870f 100644 --- a/catalog/rest/rest.go +++ b/catalog/rest/rest.go @@ -377,7 +377,9 @@ func handleNon200(rsp *http.Response, override map[int]error) error { } func fromProps(props iceberg.Properties) *options { - o := &options{} + o := &options{ + additionalProps: iceberg.Properties{}, + } for k, v := range props { switch k { case keyOauthToken: @@ -411,6 +413,11 @@ func fromProps(props iceberg.Properties) *options { } else { o.tlsConfig.InsecureSkipVerify = verify } + case "uri", "type": + default: + if v != "" { + o.additionalProps[k] = v + } } } @@ -418,7 +425,12 @@ func fromProps(props iceberg.Properties) *options { } func toProps(o *options) iceberg.Properties { - props := iceberg.Properties{} + var props iceberg.Properties + if o.additionalProps != nil { + props = o.additionalProps + } else { + props = iceberg.Properties{} + } setIf := func(key, v string) { if v != "" { diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index b17725a..b68a32f 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -21,12 +21,17 @@ package rest_test import ( "context" + "net/url" "testing" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet/pqarrow" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) @@ -43,6 +48,7 @@ func (s *RestIntegrationSuite) loadCatalog(ctx context.Context) *rest.Catalog { cat, err := catalog.Load(ctx, "local", iceberg.Properties{ "type": "rest", "uri": "http://localhost:8181", + io.S3Region: "us-east-1", io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password", }) @@ -60,7 +66,7 @@ var ( tableSchemaNested = iceberg.NewSchemaWithIdentifiers(1, []int{1}, iceberg.NestedField{ - ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false}, + ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: true}, iceberg.NestedField{ ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, iceberg.NestedField{ @@ -175,6 +181,72 @@ func (s *RestIntegrationSuite) TestCreateTable() { s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table"))) } +func (s *RestIntegrationSuite) TestWriteCommitTable() { + s.ensureNamespace() + + const location = "s3://warehouse/iceberg" + + tbl, err := s.cat.CreateTable(s.ctx, + catalog.ToIdentifier(TestNamespaceIdent, "test-table-2"), + tableSchemaNested, catalog.WithLocation(location)) + s.Require().NoError(err) + s.Require().NotNil(tbl) + + defer func() { + s.Require().NoError(s.cat.DropTable(s.ctx, catalog.ToIdentifier(TestNamespaceIdent, "test-table-2"))) + }() + + s.Equal(location, tbl.Location()) + + arrSchema, err := table.SchemaToArrowSchema(tableSchemaNested, nil, false, false) + s.Require().NoError(err) + + table, err := array.TableFromJSON(memory.DefaultAllocator, arrSchema, + []string{`[ + { + "foo": "foo_string", + "bar": 123, + "baz": true, + "qux": ["a", "b", "c"], + "quux": [{"key": "gopher", "value": [ + {"key": "golang", "value": "1337"}]}], + "location": [{"latitude": 37.7749, "longitude": -122.4194}], + "person": {"name": "gopher", "age": 10} + } + ]`}) + s.Require().NoError(err) + defer table.Release() + + pqfile, err := url.JoinPath(location, "data", "test_commit_table_data", "test.parquet") + s.Require().NoError(err) + + fw, err := tbl.FS().(io.WriteFileIO).Create(pqfile) + s.Require().NoError(err) + s.Require().NoError(pqarrow.WriteTable(table, fw, table.NumRows(), + nil, pqarrow.DefaultWriterProps())) + defer tbl.FS().Remove(pqfile) + + txn := tbl.NewTransaction() + s.Require().NoError(txn.AddFiles([]string{pqfile}, nil, false)) + updated, err := txn.Commit(s.ctx) + s.Require().NoError(err) + + mf := []iceberg.ManifestFile{} + for m, err := range updated.AllManifests() { + s.Require().NoError(err) + s.Require().NotNil(m) + mf = append(mf, m) + } + + s.Len(mf, 1) + s.EqualValues(1, mf[0].AddedDataFiles()) + entries, err := mf[0].FetchEntries(updated.FS(), false) + s.Require().NoError(err) + + s.Len(entries, 1) + s.Equal(pqfile, entries[0].DataFile().FilePath()) +} + func TestRestIntegration(t *testing.T) { suite.Run(t, new(RestIntegrationSuite)) }