laskoviymishka commented on code in PR #925:
URL: https://github.com/apache/iceberg-go/pull/925#discussion_r3115868624


##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1051,117 @@ func (c *Catalog) LoadView(ctx context.Context, 
identifier table.Identifier) (vi
 
        return loadedView.Metadata(), nil
 }
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// All table changes are applied within one DB transaction, guaranteeing
+// all-or-nothing semantics. Metadata files are written before the DB
+// commit; if the transaction rolls back, orphaned metadata files may
+// remain (consistent with single-table CommitTable behavior).
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits 
[]table.TableCommit) error {
+       if len(commits) == 0 {
+               return catalog.ErrEmptyCommitList
+       }
+
+       for _, commit := range commits {
+               if len(commit.Identifier) == 0 {
+                       return catalog.ErrMissingIdentifier
+               }
+       }
+
+       // Phase 1: Load current state and stage all table updates.
+       // We do this outside the write transaction to minimize the time
+       // the DB transaction is held open.
+       type stagedCommit struct {
+               ident   table.Identifier
+               current *table.Table
+               ns      string
+               tblName string
+               staged  *table.StagedTable
+       }
+
+       stagedCommits := make([]stagedCommit, 0, len(commits))
+       for _, commit := range commits {

Review Comment:
   If the same identifier shows up twice in commits, the second UPDATE's OCC 
check will match zero rows (the first already bumped metadata_location), we'll 
roll back the whole tx, and the first staged metadata file gets orphaned and 
all surfaced as a confusing "updated by another process" error. 
   
   Maybe worth adding a rejecting of duplicates with a map[string]struct{} 
keyed on the joined identifier?



##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1051,117 @@ func (c *Catalog) LoadView(ctx context.Context, 
identifier table.Identifier) (vi
 
        return loadedView.Metadata(), nil
 }
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// All table changes are applied within one DB transaction, guaranteeing
+// all-or-nothing semantics. Metadata files are written before the DB
+// commit; if the transaction rolls back, orphaned metadata files may
+// remain (consistent with single-table CommitTable behavior).
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits 
[]table.TableCommit) error {
+       if len(commits) == 0 {
+               return catalog.ErrEmptyCommitList
+       }
+
+       for _, commit := range commits {
+               if len(commit.Identifier) == 0 {
+                       return catalog.ErrMissingIdentifier
+               }
+       }
+
+       // Phase 1: Load current state and stage all table updates.
+       // We do this outside the write transaction to minimize the time
+       // the DB transaction is held open.
+       type stagedCommit struct {
+               ident   table.Identifier
+               current *table.Table
+               ns      string
+               tblName string
+               staged  *table.StagedTable
+       }
+
+       stagedCommits := make([]stagedCommit, 0, len(commits))
+       for _, commit := range commits {
+               ns := catalog.NamespaceFromIdent(commit.Identifier)
+               tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+               current, err := c.LoadTable(ctx, commit.Identifier)
+               if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
+                       return err
+               }
+
+               staged, err := internal.UpdateAndStageTable(ctx, current, 
commit.Identifier, commit.Requirements, commit.Updates, c)
+               if err != nil {
+                       return err
+               }
+
+               // Skip tables with no actual changes.
+               if current != nil && 
staged.Metadata().Equals(current.Metadata()) {
+                       continue
+               }
+
+               // Write the metadata file before the DB transaction.
+               if err := internal.WriteMetadata(ctx, staged.Metadata(), 
staged.MetadataLocation(), staged.Properties()); err != nil {
+                       return err
+               }
+
+               stagedCommits = append(stagedCommits, stagedCommit{
+                       ident:   commit.Identifier,
+                       current: current,
+                       ns:      strings.Join(ns, "."),
+                       tblName: tblName,
+                       staged:  staged,
+               })
+       }
+
+       if len(stagedCommits) == 0 {
+               return nil // all tables had no changes
+       }
+
+       // Phase 2: Apply all DB changes atomically in a single transaction.
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               for _, sc := range stagedCommits {
+                       if sc.current != nil {
+                               res, err := 
tx.NewUpdate().Model(&sqlIcebergTable{
+                                       CatalogName:              c.name,
+                                       TableNamespace:           sc.ns,
+                                       TableName:                sc.tblName,
+                                       IcebergType:              TableType,
+                                       MetadataLocation:         
sql.NullString{Valid: true, String: sc.staged.MetadataLocation()},
+                                       PreviousMetadataLocation: 
sql.NullString{Valid: true, String: sc.current.MetadataLocation()},
+                               }).WherePK().Where("metadata_location = ?", 
sc.current.MetadataLocation()).
+                                       Where("iceberg_type = ?", TableType).
+                                       Exec(ctx)
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               n, err := res.RowsAffected()
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               if n == 0 {
+                                       return fmt.Errorf("table has been 
updated by another process: %s.%s", sc.ns, sc.tblName)
+                               }
+                       } else {
+                               _, err := tx.NewInsert().Model(&sqlIcebergTable{

Review Comment:
   On a PK collision here
   
   concurrent create of the same table: we lose the 
`catalog.ErrTableAlreadyExists` signal that `CreateTable` and Java's 
`JdbcCatalog.doCommitCreate` both surface. 
   
   Since `MultiTableTransaction` only calls this on already-loaded tables in 
practice, maybe simpler to reject `current == nil` explicitly and leave 
creation to `CreateTable`? 
   
   Otherwise we need to detect the duplicate-key case per driver, which is 
messier.



##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1051,117 @@ func (c *Catalog) LoadView(ctx context.Context, 
identifier table.Identifier) (vi
 
        return loadedView.Metadata(), nil
 }
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// All table changes are applied within one DB transaction, guaranteeing
+// all-or-nothing semantics. Metadata files are written before the DB
+// commit; if the transaction rolls back, orphaned metadata files may
+// remain (consistent with single-table CommitTable behavior).
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits 
[]table.TableCommit) error {
+       if len(commits) == 0 {
+               return catalog.ErrEmptyCommitList
+       }
+
+       for _, commit := range commits {
+               if len(commit.Identifier) == 0 {
+                       return catalog.ErrMissingIdentifier
+               }
+       }
+
+       // Phase 1: Load current state and stage all table updates.
+       // We do this outside the write transaction to minimize the time
+       // the DB transaction is held open.
+       type stagedCommit struct {
+               ident   table.Identifier
+               current *table.Table
+               ns      string
+               tblName string
+               staged  *table.StagedTable
+       }
+
+       stagedCommits := make([]stagedCommit, 0, len(commits))
+       for _, commit := range commits {
+               ns := catalog.NamespaceFromIdent(commit.Identifier)
+               tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+               current, err := c.LoadTable(ctx, commit.Identifier)
+               if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
+                       return err
+               }
+
+               staged, err := internal.UpdateAndStageTable(ctx, current, 
commit.Identifier, commit.Requirements, commit.Updates, c)
+               if err != nil {
+                       return err
+               }
+
+               // Skip tables with no actual changes.
+               if current != nil && 
staged.Metadata().Equals(current.Metadata()) {
+                       continue
+               }
+
+               // Write the metadata file before the DB transaction.
+               if err := internal.WriteMetadata(ctx, staged.Metadata(), 
staged.MetadataLocation(), staged.Properties()); err != nil {
+                       return err
+               }
+
+               stagedCommits = append(stagedCommits, stagedCommit{
+                       ident:   commit.Identifier,
+                       current: current,
+                       ns:      strings.Join(ns, "."),
+                       tblName: tblName,
+                       staged:  staged,
+               })
+       }
+
+       if len(stagedCommits) == 0 {
+               return nil // all tables had no changes
+       }
+
+       // Phase 2: Apply all DB changes atomically in a single transaction.
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               for _, sc := range stagedCommits {
+                       if sc.current != nil {
+                               res, err := 
tx.NewUpdate().Model(&sqlIcebergTable{
+                                       CatalogName:              c.name,
+                                       TableNamespace:           sc.ns,
+                                       TableName:                sc.tblName,
+                                       IcebergType:              TableType,
+                                       MetadataLocation:         
sql.NullString{Valid: true, String: sc.staged.MetadataLocation()},
+                                       PreviousMetadataLocation: 
sql.NullString{Valid: true, String: sc.current.MetadataLocation()},
+                               }).WherePK().Where("metadata_location = ?", 
sc.current.MetadataLocation()).
+                                       Where("iceberg_type = ?", TableType).
+                                       Exec(ctx)
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               n, err := res.RowsAffected()
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               if n == 0 {
+                                       return fmt.Errorf("table has been 
updated by another process: %s.%s", sc.ns, sc.tblName)

Review Comment:
   think wrapping this in a typed sentinel (say catalog.ErrCommitConflict, or 
reusing something shared with REST) would let MultiTableTransaction and future 
retry wrappers do errors.Is across backends
   
   right now REST returns rest.ErrCommitFailed-wrapped errors and SQL returns a 
stringly-typed fmt.Errorf, so callers can't distinguish "conflict, retry me" 
from "I/O failure" portably. 
   
   Worth doing here so we don't bake divergent error contracts into each catalog



##########
catalog/sql/sql_test.go:
##########
@@ -1221,6 +1222,160 @@ func (s *SqliteCatalogTestSuite) TestLoadView() {
        s.ErrorIs(err, catalog.ErrNoSuchView)
 }
 
+func (s *SqliteCatalogTestSuite) TestCommitTransactionInterface() {
+       cat := s.getCatalogMemory()
+       _, ok := any(cat).(catalog.TransactionalCatalog)
+       s.True(ok, "sql.Catalog must implement catalog.TransactionalCatalog")
+}
+
+func (s *SqliteCatalogTestSuite) TestCommitTransactionEmptyList() {
+       cat := s.getCatalogMemory()
+       err := cat.CommitTransaction(context.Background(), nil)
+       s.ErrorIs(err, catalog.ErrEmptyCommitList)
+
+       err = cat.CommitTransaction(context.Background(), []table.TableCommit{})
+       s.ErrorIs(err, catalog.ErrEmptyCommitList)
+}
+
+func (s *SqliteCatalogTestSuite) TestCommitTransactionMissingIdentifier() {
+       cat := s.getCatalogMemory()
+       err := cat.CommitTransaction(context.Background(), []table.TableCommit{
+               {Identifier: table.Identifier{}},
+       })
+       s.ErrorIs(err, catalog.ErrMissingIdentifier)
+}
+
+func (s *SqliteCatalogTestSuite) TestCommitTransactionSetProperties() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       ctx := context.Background()
+       for _, cat := range catalogs {
+               tbl1ID := s.randomTableIdentifier()
+               tbl2ID := s.randomTableIdentifier()
+               ns1 := catalog.NamespaceFromIdent(tbl1ID)
+               ns2 := catalog.NamespaceFromIdent(tbl2ID)
+               s.Require().NoError(cat.CreateNamespace(ctx, ns1, nil))
+               if strings.Join(ns1, ".") != strings.Join(ns2, ".") {
+                       s.Require().NoError(cat.CreateNamespace(ctx, ns2, nil))
+               }
+
+               tbl1, err := cat.CreateTable(ctx, tbl1ID, tableSchemaNested)
+               s.Require().NoError(err)
+               tbl2, err := cat.CreateTable(ctx, tbl2ID, tableSchemaNested)
+               s.Require().NoError(err)
+
+               // Build transactions on each table.
+               tx1 := tbl1.NewTransaction()
+               
s.Require().NoError(tx1.SetProperties(map[string]string{"pipeline": "v2"}))
+               tc1, err := tx1.TableCommit()
+               s.Require().NoError(err)
+
+               tx2 := tbl2.NewTransaction()
+               
s.Require().NoError(tx2.SetProperties(map[string]string{"pipeline": "v2"}))
+               tc2, err := tx2.TableCommit()
+               s.Require().NoError(err)
+
+               // Commit atomically.
+               s.Require().NoError(cat.CommitTransaction(ctx, 
[]table.TableCommit{tc1, tc2}))
+
+               // Verify both tables have updated properties.
+               loaded1, err := cat.LoadTable(ctx, tbl1ID)
+               s.Require().NoError(err)
+               s.Equal("v2", loaded1.Properties()["pipeline"])
+
+               loaded2, err := cat.LoadTable(ctx, tbl2ID)
+               s.Require().NoError(err)
+               s.Equal("v2", loaded2.Properties()["pipeline"])
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCommitTransactionPartialRollback() {

Review Comment:
   i think this test actually fails during Phase 1 staging (the bogus 
AssertTableUUID is rejected before we ever enter withWriteTx), so the DB 
rollback path the name promises never runs. 
   
   Worth adding one that forces a Phase-2 failure after at least one `UPDATE` 
has landed in the tx, e.g. mutate one table's `metadata_location` via raw SQL 
between stage and commit so the second `UPDATE` sees zero rows. 
   
   That's the invariant the PR is adding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to