This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new dafadaa6 feat(cli): add branch create and tag create commands (#1068)
dafadaa6 is described below
commit dafadaa60719292c1f3e303b9db0519e5d13dd8e
Author: Tanmay Rauth <[email protected]>
AuthorDate: Wed May 13 15:19:51 2026 -0700
feat(cli): add branch create and tag create commands (#1068)
Add `iceberg branch create TABLE_ID NAME` and `iceberg tag create
TABLE_ID NAME` with snapshot ref retention options.
Related: #957
Depends On: #1073
---
cmd/iceberg/branch_tag.go | 187 ++++++++++++++++++++++++++++++++++++--
cmd/iceberg/branch_tag_test.go | 197 +++++++++++++++++++++++++++++++++++++++++
cmd/iceberg/maintenance.go | 13 ++-
3 files changed, 385 insertions(+), 12 deletions(-)
diff --git a/cmd/iceberg/branch_tag.go b/cmd/iceberg/branch_tag.go
index 967b9bc4..dcaa9951 100644
--- a/cmd/iceberg/branch_tag.go
+++ b/cmd/iceberg/branch_tag.go
@@ -19,21 +19,192 @@ package main
import (
"context"
+ "encoding/json"
"errors"
+ "fmt"
"os"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
)
-func runBranch(_ context.Context, output Output, _ catalog.Catalog, _
*BranchCmd) {
- output.Error(errors.New("branch: not yet implemented"))
- os.Exit(1)
+func runBranch(ctx context.Context, output Output, cat catalog.Catalog, cmd
*BranchCmd) {
+ switch {
+ case cmd.Create != nil:
+ runBranchCreate(ctx, output, cat, cmd.Create)
+ }
}
-func runTag(_ context.Context, output Output, _ catalog.Catalog, _ *TagCmd) {
- output.Error(errors.New("tag: not yet implemented"))
- os.Exit(1)
+func runTag(ctx context.Context, output Output, cat catalog.Catalog, cmd
*TagCmd) {
+ switch {
+ case cmd.Create != nil:
+ runTagCreate(ctx, output, cat, cmd.Create)
+ }
}
-func (textOutput) RefCreated(_ RefCreatedResult) {}
-func (jsonOutput) RefCreated(_ RefCreatedResult) {}
+func runBranchCreate(ctx context.Context, output Output, cat catalog.Catalog,
cmd *BranchCreateCmd) {
+ tbl := loadTable(ctx, output, cat, cmd.TableID)
+ meta := tbl.Metadata()
+
+ for name := range meta.Refs() {
+ if name == cmd.BranchName {
+ output.Error(fmt.Errorf("ref %q already exists",
cmd.BranchName))
+ os.Exit(1)
+ }
+ }
+
+ snapshotID := resolveSnapshotID(output, tbl, cmd.SnapshotID)
+
+ if err := confirmAction(
+ fmt.Sprintf("Create branch %q on %s at snapshot %d?",
cmd.BranchName, tableIDString(tbl), snapshotID),
+ cmd.Yes,
+ ); err != nil {
+ output.Error(err)
+ os.Exit(1)
+ }
+
+ var maxRefAgeMs int64
+ if cmd.MaxRefAge != "" {
+ d, err := parseDuration(cmd.MaxRefAge)
+ if err != nil {
+ output.Error(fmt.Errorf("invalid --max-ref-age: %w",
err))
+ os.Exit(1)
+ }
+
+ maxRefAgeMs = d.Milliseconds()
+ }
+
+ var maxSnapshotAgeMs int64
+ if cmd.MaxSnapshotAge != "" {
+ d, err := parseDuration(cmd.MaxSnapshotAge)
+ if err != nil {
+ output.Error(fmt.Errorf("invalid --max-snapshot-age:
%w", err))
+ os.Exit(1)
+ }
+
+ maxSnapshotAgeMs = d.Milliseconds()
+ }
+
+ var minSnapshotsToKeep int
+ if cmd.MinSnapshotsToKeep != nil {
+ minSnapshotsToKeep = *cmd.MinSnapshotsToKeep
+ }
+
+ update := table.NewSetSnapshotRefUpdate(cmd.BranchName, snapshotID,
table.BranchRef,
+ maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep)
+ reqs := []table.Requirement{
+ table.AssertTableUUID(meta.TableUUID()),
+ table.AssertRefSnapshotID(cmd.BranchName, nil),
+ }
+
+ if _, _, err := cat.CommitTable(ctx, tbl.Identifier(), reqs,
[]table.Update{update}); err != nil {
+ output.Error(fmt.Errorf("failed to create branch: %w", err))
+ os.Exit(1)
+ }
+
+ result := RefCreatedResult{
+ Table: tableIDString(tbl),
+ RefName: cmd.BranchName,
+ RefType: string(table.BranchRef),
+ SnapshotID: snapshotID,
+ }
+ if maxRefAgeMs > 0 {
+ result.MaxRefAgeMs = &maxRefAgeMs
+ }
+ if maxSnapshotAgeMs > 0 {
+ result.MaxSnapshotAgeMs = &maxSnapshotAgeMs
+ }
+ if minSnapshotsToKeep > 0 {
+ result.MinSnapshotsToKeep = &minSnapshotsToKeep
+ }
+
+ output.RefCreated(result)
+}
+
+func runTagCreate(ctx context.Context, output Output, cat catalog.Catalog, cmd
*TagCreateCmd) {
+ tbl := loadTable(ctx, output, cat, cmd.TableID)
+ meta := tbl.Metadata()
+
+ for name := range meta.Refs() {
+ if name == cmd.TagName {
+ output.Error(fmt.Errorf("ref %q already exists",
cmd.TagName))
+ os.Exit(1)
+ }
+ }
+
+ snapshotID := resolveSnapshotID(output, tbl, cmd.SnapshotID)
+
+ if err := confirmAction(
+ fmt.Sprintf("Create tag %q on %s at snapshot %d?", cmd.TagName,
tableIDString(tbl), snapshotID),
+ cmd.Yes,
+ ); err != nil {
+ output.Error(err)
+ os.Exit(1)
+ }
+
+ var maxRefAgeMs int64
+ if cmd.MaxRefAge != "" {
+ d, err := parseDuration(cmd.MaxRefAge)
+ if err != nil {
+ output.Error(fmt.Errorf("invalid --max-ref-age: %w",
err))
+ os.Exit(1)
+ }
+
+ maxRefAgeMs = d.Milliseconds()
+ }
+
+ update := table.NewSetSnapshotRefUpdate(cmd.TagName, snapshotID,
table.TagRef,
+ maxRefAgeMs, 0, 0)
+ reqs := []table.Requirement{
+ table.AssertTableUUID(meta.TableUUID()),
+ table.AssertRefSnapshotID(cmd.TagName, nil),
+ }
+
+ if _, _, err := cat.CommitTable(ctx, tbl.Identifier(), reqs,
[]table.Update{update}); err != nil {
+ output.Error(fmt.Errorf("failed to create tag: %w", err))
+ os.Exit(1)
+ }
+
+ result := RefCreatedResult{
+ Table: tableIDString(tbl),
+ RefName: cmd.TagName,
+ RefType: string(table.TagRef),
+ SnapshotID: snapshotID,
+ }
+ if maxRefAgeMs > 0 {
+ result.MaxRefAgeMs = &maxRefAgeMs
+ }
+
+ output.RefCreated(result)
+}
+
+func resolveSnapshotID(output Output, tbl *table.Table, explicit *int64) int64
{
+ if explicit != nil {
+ if tbl.Metadata().SnapshotByID(*explicit) == nil {
+ output.Error(fmt.Errorf("snapshot %d not found",
*explicit))
+ os.Exit(1)
+ }
+
+ return *explicit
+ }
+
+ snap := tbl.Metadata().CurrentSnapshot()
+ if snap == nil {
+ output.Error(errors.New("table has no current snapshot; specify
--snapshot-id explicitly"))
+ os.Exit(1)
+ }
+
+ return snap.SnapshotID
+}
+
+func (t textOutput) RefCreated(result RefCreatedResult) {
+ pterm.Printfln("Created %s %q on %s at snapshot %d.",
+ result.RefType, result.RefName, result.Table, result.SnapshotID)
+}
+
+func (j jsonOutput) RefCreated(result RefCreatedResult) {
+ if err := json.NewEncoder(os.Stdout).Encode(result); err != nil {
+ j.Error(err)
+ }
+}
diff --git a/cmd/iceberg/branch_tag_test.go b/cmd/iceberg/branch_tag_test.go
new file mode 100644
index 00000000..7c93750c
--- /dev/null
+++ b/cmd/iceberg/branch_tag_test.go
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "os"
+ "testing"
+
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+const branchTagTestMetadata = `{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 1,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 1,
+ "current-schema-id": 0,
+ "schemas": [
+ {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x",
"required": true, "type": "long"}]}
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 0,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {},
+ "current-snapshot-id": 5000,
+ "snapshots": [
+ {"snapshot-id": 5000, "timestamp-ms": 1615100955770,
"sequence-number": 1, "summary": {"operation": "append"}, "manifest-list":
"s3://a/b/1.avro", "schema-id": 0}
+ ],
+ "snapshot-log": [],
+ "metadata-log": [],
+ "refs": {"main": {"snapshot-id": 5000, "type": "branch"}}
+}`
+
+func TestResolveSnapshotIDExplicit(t *testing.T) {
+ meta, err := table.ParseMetadataBytes([]byte(branchTagTestMetadata))
+ require.NoError(t, err)
+
+ tbl := table.New([]string{"db", "tbl"}, meta, "", nil, nil)
+ explicit := int64(5000)
+
+ result := resolveSnapshotID(textOutput{}, tbl, &explicit)
+ assert.Equal(t, int64(5000), result)
+}
+
+func TestResolveSnapshotIDCurrent(t *testing.T) {
+ meta, err := table.ParseMetadataBytes([]byte(branchTagTestMetadata))
+ require.NoError(t, err)
+
+ tbl := table.New([]string{"db", "tbl"}, meta, "", nil, nil)
+
+ result := resolveSnapshotID(textOutput{}, tbl, nil)
+ assert.Equal(t, int64(5000), result)
+}
+
+func TestTextOutputRefCreated(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+ t.Cleanup(func() {
+ pterm.SetDefaultOutput(os.Stderr)
+ pterm.EnableColor()
+ })
+
+ result := RefCreatedResult{
+ Table: "db.events",
+ RefName: "feature-branch",
+ RefType: "branch",
+ SnapshotID: 5000,
+ }
+
+ buf.Reset()
+ textOutput{}.RefCreated(result)
+
+ output := buf.String()
+ assert.Contains(t, output, "Created branch")
+ assert.Contains(t, output, "feature-branch")
+ assert.Contains(t, output, "db.events")
+ assert.Contains(t, output, "5000")
+}
+
+func TestTextOutputRefCreatedTag(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+ t.Cleanup(func() {
+ pterm.SetDefaultOutput(os.Stderr)
+ pterm.EnableColor()
+ })
+
+ result := RefCreatedResult{
+ Table: "db.events",
+ RefName: "v1.0",
+ RefType: "tag",
+ SnapshotID: 3000,
+ }
+
+ buf.Reset()
+ textOutput{}.RefCreated(result)
+
+ output := buf.String()
+ assert.Contains(t, output, "Created tag")
+ assert.Contains(t, output, "v1.0")
+ assert.Contains(t, output, "3000")
+}
+
+func TestJSONOutputRefCreated(t *testing.T) {
+ r, w, err := os.Pipe()
+ require.NoError(t, err)
+
+ oldStdout := os.Stdout
+ os.Stdout = w
+ t.Cleanup(func() {
+ w.Close()
+ os.Stdout = oldStdout
+ })
+
+ result := RefCreatedResult{
+ Table: "db.events",
+ RefName: "feature-branch",
+ RefType: "branch",
+ SnapshotID: 5000,
+ }
+
+ jsonOutput{}.RefCreated(result)
+
+ w.Close()
+
+ var buf bytes.Buffer
+ _, _ = buf.ReadFrom(r)
+
+ output := buf.String()
+ assert.Contains(t, output, `"table":"db.events"`)
+ assert.Contains(t, output, `"ref_name":"feature-branch"`)
+ assert.Contains(t, output, `"ref_type":"branch"`)
+ assert.Contains(t, output, `"snapshot_id":5000`)
+}
+
+func TestJSONOutputRefCreatedWithRetention(t *testing.T) {
+ r, w, err := os.Pipe()
+ require.NoError(t, err)
+
+ oldStdout := os.Stdout
+ os.Stdout = w
+ t.Cleanup(func() {
+ w.Close()
+ os.Stdout = oldStdout
+ })
+
+ maxRefAge := int64(604800000)
+ maxSnapshotAge := int64(86400000)
+ minSnapshots := 5
+
+ result := RefCreatedResult{
+ Table: "db.events",
+ RefName: "feature-branch",
+ RefType: "branch",
+ SnapshotID: 5000,
+ MaxRefAgeMs: &maxRefAge,
+ MaxSnapshotAgeMs: &maxSnapshotAge,
+ MinSnapshotsToKeep: &minSnapshots,
+ }
+
+ jsonOutput{}.RefCreated(result)
+
+ w.Close()
+
+ var buf bytes.Buffer
+ _, _ = buf.ReadFrom(r)
+
+ output := buf.String()
+ assert.Contains(t, output, `"max_ref_age_ms":604800000`)
+ assert.Contains(t, output, `"max_snapshot_age_ms":86400000`)
+ assert.Contains(t, output, `"min_snapshots_to_keep":5`)
+}
diff --git a/cmd/iceberg/maintenance.go b/cmd/iceberg/maintenance.go
index 7a0f5e43..d588b16a 100644
--- a/cmd/iceberg/maintenance.go
+++ b/cmd/iceberg/maintenance.go
@@ -86,6 +86,7 @@ type BranchCreateCmd struct {
MaxRefAge string `arg:"--max-ref-age" help:"max ref age
duration (e.g. 7d, 168h)"`
MaxSnapshotAge string `arg:"--max-snapshot-age" help:"max snapshot
age duration"`
MinSnapshotsToKeep *int `arg:"--min-snapshots-to-keep" help:"minimum
snapshots to keep"`
+ Yes bool `arg:"--yes" help:"skip confirmation prompt"`
}
type BranchCmd struct {
@@ -97,6 +98,7 @@ type TagCreateCmd struct {
TagName string `arg:"positional,required" help:"tag name"`
SnapshotID *int64 `arg:"--snapshot-id" help:"snapshot ID (defaults to
current snapshot)"`
MaxRefAge string `arg:"--max-ref-age" help:"max ref age duration (e.g.
7d, 168h)"`
+ Yes bool `arg:"--yes" help:"skip confirmation prompt"`
}
type TagCmd struct {
@@ -173,10 +175,13 @@ type RollbackResult struct {
}
type RefCreatedResult struct {
- Table string `json:"table"`
- RefName string `json:"ref_name"`
- RefType string `json:"ref_type"`
- SnapshotID int64 `json:"snapshot_id"`
+ Table string `json:"table"`
+ RefName string `json:"ref_name"`
+ RefType string `json:"ref_type"`
+ SnapshotID int64 `json:"snapshot_id"`
+ MaxRefAgeMs *int64 `json:"max_ref_age_ms,omitempty"`
+ MaxSnapshotAgeMs *int64 `json:"max_snapshot_age_ms,omitempty"`
+ MinSnapshotsToKeep *int `json:"min_snapshots_to_keep,omitempty"`
}
// Shared helpers