laskoviymishka commented on code in PR #925:
URL: https://github.com/apache/iceberg-go/pull/925#discussion_r3129724446
##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1052,126 @@ func (c *Catalog) LoadView(ctx context.Context,
identifier table.Identifier) (vi
return loadedView.Metadata(), nil
}
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// This method provides "commit-point atomicity". To avoid long-running DB
+// transactions involving I/O (like uploading metadata files), table state
+// is loaded and metadata files are written outside the write transaction.
+// Atomicity is guaranteed at the commit point within the database; concurrent
+// updates are detected using per-row optimistic concurrency control (OCC)
+// based on the metadata_location.
+//
+// If any table update fails due to an OCC conflict or database error, the
+// entire transaction is rolled back, ensuring all-or-nothing database updates.
+// Note that metadata files written during the preparation phase may remain
+// as orphaned files if the transaction is rolled back.
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits
[]table.TableCommit) error {
+ if len(commits) == 0 {
+ return catalog.ErrEmptyCommitList
+ }
+
+ seen := make(map[string]struct{})
+
+ for _, commit := range commits {
+ if len(commit.Identifier) == 0 {
+ return catalog.ErrMissingIdentifier
+ }
+
+ key := strings.Join(commit.Identifier, ".")
+ if _, ok := seen[key]; ok {
+ return fmt.Errorf("duplicate table in commit list: %s",
key)
+ }
+ seen[key] = struct{}{}
+ }
+
+ // Phase 1: Load current state and stage all table updates.
+ // We do this outside the write transaction to minimize the time
+ // the DB transaction is held open.
+ type stagedCommit struct {
+ ident table.Identifier
+ current *table.Table
+ ns string
+ tblName string
+ staged *table.StagedTable
+ }
+
+ stagedCommits := make([]stagedCommit, 0, len(commits))
+ for _, commit := range commits {
+ ns := catalog.NamespaceFromIdent(commit.Identifier)
+ tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+ current, err := c.LoadTable(ctx, commit.Identifier)
+ if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
Review Comment:
This swallows ErrNoSuchTable and leaves current = nil. But Phase 2 no longer
guards against nil: sc.current.MetadataLocation() will panic.
CommitTransaction is for updating existing tables, not creating. Drop the
errors.Is branch:
```
if err != nil {
return err
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]