xixipi-lining commented on code in PR #925:
URL: https://github.com/apache/iceberg-go/pull/925#discussion_r3129479783


##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1051,117 @@ func (c *Catalog) LoadView(ctx context.Context, 
identifier table.Identifier) (vi
 
        return loadedView.Metadata(), nil
 }
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// All table changes are applied within one DB transaction, guaranteeing
+// all-or-nothing semantics. Metadata files are written before the DB
+// commit; if the transaction rolls back, orphaned metadata files may
+// remain (consistent with single-table CommitTable behavior).
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits 
[]table.TableCommit) error {
+       if len(commits) == 0 {
+               return catalog.ErrEmptyCommitList
+       }
+
+       for _, commit := range commits {
+               if len(commit.Identifier) == 0 {
+                       return catalog.ErrMissingIdentifier
+               }
+       }
+
+       // Phase 1: Load current state and stage all table updates.
+       // We do this outside the write transaction to minimize the time
+       // the DB transaction is held open.
+       type stagedCommit struct {
+               ident   table.Identifier
+               current *table.Table
+               ns      string
+               tblName string
+               staged  *table.StagedTable
+       }
+
+       stagedCommits := make([]stagedCommit, 0, len(commits))
+       for _, commit := range commits {
+               ns := catalog.NamespaceFromIdent(commit.Identifier)
+               tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+               current, err := c.LoadTable(ctx, commit.Identifier)
+               if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
+                       return err
+               }
+
+               staged, err := internal.UpdateAndStageTable(ctx, current, 
commit.Identifier, commit.Requirements, commit.Updates, c)
+               if err != nil {
+                       return err
+               }
+
+               // Skip tables with no actual changes.
+               if current != nil && 
staged.Metadata().Equals(current.Metadata()) {
+                       continue
+               }
+
+               // Write the metadata file before the DB transaction.
+               if err := internal.WriteMetadata(ctx, staged.Metadata(), 
staged.MetadataLocation(), staged.Properties()); err != nil {
+                       return err
+               }
+
+               stagedCommits = append(stagedCommits, stagedCommit{
+                       ident:   commit.Identifier,
+                       current: current,
+                       ns:      strings.Join(ns, "."),
+                       tblName: tblName,
+                       staged:  staged,
+               })
+       }
+
+       if len(stagedCommits) == 0 {
+               return nil // all tables had no changes
+       }
+
+       // Phase 2: Apply all DB changes atomically in a single transaction.
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               for _, sc := range stagedCommits {
+                       if sc.current != nil {
+                               res, err := 
tx.NewUpdate().Model(&sqlIcebergTable{
+                                       CatalogName:              c.name,
+                                       TableNamespace:           sc.ns,
+                                       TableName:                sc.tblName,
+                                       IcebergType:              TableType,
+                                       MetadataLocation:         
sql.NullString{Valid: true, String: sc.staged.MetadataLocation()},
+                                       PreviousMetadataLocation: 
sql.NullString{Valid: true, String: sc.current.MetadataLocation()},
+                               }).WherePK().Where("metadata_location = ?", 
sc.current.MetadataLocation()).
+                                       Where("iceberg_type = ?", TableType).
+                                       Exec(ctx)
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               n, err := res.RowsAffected()
+                               if err != nil {
+                                       return fmt.Errorf("error updating table 
information: %w", err)
+                               }
+
+                               if n == 0 {
+                                       return fmt.Errorf("table has been 
updated by another process: %s.%s", sc.ns, sc.tblName)

Review Comment:
   Great point. I’ve added catalog.ErrCommitFailed to align with Java/Python 
and ensure a consistent error contract. Now the SQL catalog wraps failures with 
this error, making it portable for errors.Is checks
     across different backends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to