This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f60b1b  feat(catalog): Initial implementation of sql catalog (#246)
1f60b1b is described below

commit 1f60b1bac0c57ba05bfc887c229f4a16c185d749
Author: Matt Topol <[email protected]>
AuthorDate: Wed Jan 29 16:35:23 2025 -0500

    feat(catalog): Initial implementation of sql catalog (#246)
    
    Building on #266 (which would need to be merged before this), this
    creates an initial implementation of a SQL catalog.
    
    We're utilizing the https://bun.uptrace.dev/ ORM to handle dispatching
    queries among SQLite, Postgres, Oracle, MySQL, and MSSQL dialects, but
    accepts a standard `*sql.DB` object and custom drivers rather than
    bundling them directly into this package. That provides extra
    flexibility as long as the desired database supports one of the SQL
    dialects that are implemented.
---
 README.md                 |  31 +-
 catalog/catalog.go        |  49 +++
 catalog/glue/glue.go      |  53 +--
 catalog/internal/utils.go |  30 ++
 catalog/sql/sql.go        | 721 +++++++++++++++++++++++++++++++++++
 catalog/sql/sql_test.go   | 945 ++++++++++++++++++++++++++++++++++++++++++++++
 go.mod                    |  29 +-
 go.sum                    |  72 +++-
 types.go                  |  11 +
 9 files changed, 1873 insertions(+), 68 deletions(-)

diff --git a/README.md b/README.md
index ea26dd1..5303c07 100644
--- a/README.md
+++ b/README.md
@@ -60,22 +60,21 @@ $ cd iceberg-go/cmd/iceberg && go build .
 
 ### Catalog Support
 
-| Operation                | REST | Hive | DynamoDB | Glue |
-|:-------------------------|:----:| :--: | :------: |:----:|
-| Load Table               |  X   |      |          |  X   |
-| List Tables              |  X   |      |          |  X   |
-| Create Table             |  X   |      |          |  X   |
-| Update Current Snapshot  |      |      |          |      |
-| Create New Snapshot      |      |      |          |      |
-| Rename Table             |  X   |      |          |  X   |
-| Drop Table               |  X   |      |          |  X   |
-| Alter Table              |      |      |          |      |
-| Check Table Exists       |  X   |      |          |      |
-| Set Table Properties     |  X   |      |          |  X   |
-| Create Namespace         |  X   |      |          |  X   |
-| Check Namespace Exists   |  X   |      |          |      |
-| Drop Namespace           |  X   |      |          |  X   |
-| Set Namespace Properties |  X   |      |          |  X   |
+| Operation                | REST | Hive | DynamoDB | Glue | SQL |
+|:-------------------------|:----:| :--: | :------: |:----:|:---:|
+| Load Table               |  X   |      |          |  X   |  X  |
+| List Tables              |  X   |      |          |  X   |  X  |
+| Create Table             |  X   |      |          |  X   |  X  |
+| Update Current Snapshot  |      |      |          |      |     |
+| Create New Snapshot      |      |      |          |      |     |
+| Rename Table             |  X   |      |          |  X   |  X  |
+| Drop Table               |  X   |      |          |  X   |  X  |
+| Alter Table              |      |      |          |      |  X  |
+| Set Table Properties     |  X   |      |          |      |  X  |
+| Create Namespace         |  X   |      |          |  X   |  X  |
+| Check Namespace Exists   |  X   |      |          |      |  X  |
+| Drop Namespace           |  X   |      |          |  X   |  X  |
+| Set Namespace Properties |  X   |      |          |  X   |  X  |
 
 ### Read/Write Data Support
 
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 22cce9a..db14dab 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -30,6 +30,8 @@ package catalog
 import (
        "context"
        "errors"
+       "fmt"
+       "maps"
        "strings"
 
        "github.com/apache/iceberg-go"
@@ -147,3 +149,50 @@ func WithProperties(props iceberg.Properties) 
CreateTableOpt {
                cfg.Properties = props
        }
 }
+
+//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't 
want to export it
+func checkForOverlap(removals []string, updates iceberg.Properties) error {
+       overlap := []string{}
+       for _, key := range removals {
+               if _, ok := updates[key]; ok {
+                       overlap = append(overlap, key)
+               }
+       }
+       if len(overlap) > 0 {
+               return fmt.Errorf("conflict between removals and updates for 
keys: %v", overlap)
+       }
+       return nil
+}
+
+//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't 
want to export it
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
PropertiesUpdateSummary, error) {
+       if err := checkForOverlap(removals, updates); err != nil {
+               return nil, PropertiesUpdateSummary{}, err
+       }
+       var (
+               updatedProps = maps.Clone(currentProps)
+               removed      = make([]string, 0, len(removals))
+               updated      = make([]string, 0, len(updates))
+       )
+
+       for _, key := range removals {
+               if _, exists := updatedProps[key]; exists {
+                       delete(updatedProps, key)
+                       removed = append(removed, key)
+               }
+       }
+
+       for key, value := range updates {
+               if updatedProps[key] != value {
+                       updated = append(updated, key)
+                       updatedProps[key] = value
+               }
+       }
+
+       summary := PropertiesUpdateSummary{
+               Removed: removed,
+               Updated: updated,
+               Missing: iceberg.Difference(removals, removed),
+       }
+       return updatedProps, summary, nil
+}
diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index 1d62ee6..c9f5950 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -21,8 +21,8 @@ import (
        "context"
        "errors"
        "fmt"
-       "maps"
        "strconv"
+       _ "unsafe"
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
@@ -403,6 +403,12 @@ func (c *Catalog) LoadNamespaceProperties(ctx 
context.Context, namespace table.I
        return props, nil
 }
 
+// avoid circular dependency while still avoiding having to export the 
getUpdatedPropsAndUpdateSummary function
+// so that we can re-use it in the catalog implementations without duplicating 
the code.
+
+//go:linkname getUpdatedPropsAndUpdateSummary 
github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
catalog.PropertiesUpdateSummary, error)
+
 // UpdateNamespaceProperties updates the properties of an Iceberg namespace in 
the Glue catalog.
 // The removals list contains the keys to remove, and the updates map contains 
the keys and values to update.
 func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
@@ -556,48 +562,3 @@ func filterDatabaseListByType(databases []types.Database, 
databaseType string) [
 
        return filtered
 }
-
-func checkForOverlap(removals []string, updates iceberg.Properties) error {
-       overlap := []string{}
-       for _, key := range removals {
-               if _, ok := updates[key]; ok {
-                       overlap = append(overlap, key)
-               }
-       }
-       if len(overlap) > 0 {
-               return fmt.Errorf("conflict between removals and updates for 
keys: %v", overlap)
-       }
-       return nil
-}
-
-func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
catalog.PropertiesUpdateSummary, error) {
-       if err := checkForOverlap(removals, updates); err != nil {
-               return nil, catalog.PropertiesUpdateSummary{}, err
-       }
-       var (
-               updatedProps = maps.Clone(currentProps)
-               removed      = make([]string, 0, len(removals))
-               updated      = make([]string, 0, len(updates))
-       )
-
-       for _, key := range removals {
-               if _, exists := updatedProps[key]; exists {
-                       delete(updatedProps, key)
-                       removed = append(removed, key)
-               }
-       }
-
-       for key, value := range updates {
-               if updatedProps[key] != value {
-                       updated = append(updated, key)
-                       updatedProps[key] = value
-               }
-       }
-
-       summary := catalog.PropertiesUpdateSummary{
-               Removed: removed,
-               Updated: updated,
-               Missing: iceberg.Difference(removals, removed),
-       }
-       return updatedProps, summary, nil
-}
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 1ad2f97..500368b 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -18,8 +18,13 @@
 package internal
 
 import (
+       "encoding/json"
+       "fmt"
+
        "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
 )
 
 type CreateTableCfg struct {
@@ -28,3 +33,28 @@ type CreateTableCfg struct {
        SortOrder     table.SortOrder
        Properties    iceberg.Properties
 }
+
+func GetMetadataLoc(location string, newVersion uint) string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               location, newVersion, uuid.New().String())
+}
+
+func WriteMetadata(metadata table.Metadata, loc string, props 
iceberg.Properties) error {
+       fs, err := io.LoadFS(props, loc)
+       if err != nil {
+               return err
+       }
+
+       wfs, ok := fs.(io.WriteFileIO)
+       if !ok {
+               return fmt.Errorf("filesystem IO does not support writing")
+       }
+
+       out, err := wfs.Create(loc)
+       if err != nil {
+               return nil
+       }
+
+       defer out.Close()
+       return json.NewEncoder(out).Encode(metadata)
+}
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
new file mode 100644
index 0000000..9972582
--- /dev/null
+++ b/catalog/sql/sql.go
@@ -0,0 +1,721 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sql
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "maps"
+       "path"
+       "slices"
+       "strings"
+       "sync"
+       _ "unsafe"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/internal"
+       "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/uptrace/bun"
+       "github.com/uptrace/bun/dialect/feature"
+       "github.com/uptrace/bun/dialect/mssqldialect"
+       "github.com/uptrace/bun/dialect/mysqldialect"
+       "github.com/uptrace/bun/dialect/oracledialect"
+       "github.com/uptrace/bun/dialect/pgdialect"
+       "github.com/uptrace/bun/dialect/sqlitedialect"
+       "github.com/uptrace/bun/extra/bundebug"
+       "github.com/uptrace/bun/schema"
+)
+
+type SupportedDialect string
+
+const (
+       Postgres SupportedDialect = "postgres"
+       MySQL    SupportedDialect = "mysql"
+       SQLite   SupportedDialect = "sqlite"
+       MSSQL    SupportedDialect = "mssql"
+       Oracle   SupportedDialect = "oracle"
+)
+
+const (
+       DialectKey           = "sql.dialect"
+       DriverKey            = "sql.driver"
+       initCatalogTablesKey = "init_catalog_tables"
+)
+
+func init() {
+       catalog.Register("sql", catalog.RegistrarFunc(func(name string, p 
iceberg.Properties) (c catalog.Catalog, err error) {
+               driver, ok := p[DriverKey]
+               if !ok {
+                       return nil, errors.New("must provide driver to pass to 
sql.Open")
+               }
+
+               dialect := strings.ToLower(p[DialectKey])
+               if dialect == "" {
+                       return nil, errors.New("must provide sql dialect to 
use")
+               }
+
+               uri := strings.TrimPrefix(p.Get("uri", ""), "sql://")
+               sqldb, err := sql.Open(driver, uri)
+               if err != nil {
+                       return nil, err
+               }
+
+               defer func() {
+                       if r := recover(); r != nil {
+                               err = fmt.Errorf("failed to create SQL catalog: 
%v", r)
+                       }
+               }()
+
+               return NewCatalog(p.Get(name, "sql"), sqldb, 
SupportedDialect(dialect), p)
+       }))
+}
+
+var (
+       _ catalog.Catalog = (*Catalog)(nil)
+)
+
+var (
+       minimalNamespaceProps = iceberg.Properties{"exists": "true"}
+
+       dialects  = map[SupportedDialect]schema.Dialect{}
+       dialectMx sync.Mutex
+)
+
+func createDialect(d SupportedDialect) schema.Dialect {
+       switch d {
+       case Postgres:
+               return pgdialect.New()
+       case MySQL:
+               return mysqldialect.New()
+       case SQLite:
+               return sqlitedialect.New()
+       case MSSQL:
+               return mssqldialect.New()
+       case Oracle:
+               return oracledialect.New()
+       default:
+               panic("unsupported sql dialect")
+       }
+}
+
+func getDialect(d SupportedDialect) schema.Dialect {
+       dialectMx.Lock()
+       defer dialectMx.Unlock()
+       ret, ok := dialects[d]
+       if !ok {
+               ret = createDialect(d)
+               dialects[d] = ret
+       }
+       return ret
+}
+
+type sqlIcebergTable struct {
+       bun.BaseModel `bun:"table:iceberg_tables"`
+
+       CatalogName              string `bun:",pk"`
+       TableNamespace           string `bun:",pk"`
+       TableName                string `bun:",pk"`
+       MetadataLocation         sql.NullString
+       PreviousMetadataLocation sql.NullString
+}
+
+type sqlIcebergNamespaceProps struct {
+       bun.BaseModel `bun:"table:iceberg_namespace_properties"`
+
+       CatalogName   string `bun:",pk"`
+       Namespace     string `bun:",pk"`
+       PropertyKey   string `bun:",pk"`
+       PropertyValue sql.NullString
+}
+
+func withReadTx[R any](ctx context.Context, db *bun.DB, fn 
func(context.Context, bun.Tx) (R, error)) (result R, err error) {
+       db.RunInTx(ctx, &sql.TxOptions{ReadOnly: true}, func(ctx 
context.Context, tx bun.Tx) error {
+               result, err = fn(ctx, tx)
+               return err
+       })
+       return
+}
+
+func withWriteTx(ctx context.Context, db *bun.DB, fn func(context.Context, 
bun.Tx) error) error {
+       return db.RunInTx(ctx, &sql.TxOptions{Isolation: 
sql.LevelLinearizable}, func(ctx context.Context, tx bun.Tx) error {
+               return fn(ctx, tx)
+       })
+}
+
+type Catalog struct {
+       db    *bun.DB
+       name  string
+       props iceberg.Properties
+}
+
+// NewCatalog creates a new sql-based catalog using the provided sql.DB handle 
to perform any queries.
+//
+// The dialect parameter determines the SQL dialect to use for query 
generation and must be one of the
+// supported dialects, i.e. one of the exported SupportedDialect values. The 
separation here allows for
+// the use of different drivers/databases provided they support the chosen sql 
dialect (e.g. if a particular
+// database supports the MySQL dialect, then the database can still be used 
with this catalog even though
+// it's not explicitly implemented).
+//
+// If the "init_catalog_tables" property is set to "true", then creating the 
catalog will also attempt to
+// to verify whether the necessary tables (iceberg_tables and 
iceberg_namespace_properties) exist, creating
+// them if they do not already exist.
+//
+// The environment variable ICEBERG_SQL_DEBUG can be set to automatically log 
the sql queries to the terminal:
+// - ICEBERG_SQL_DEBUG=1 logs only failed queries
+// - ICEBERG_SQL_DEBUG=2 logs all queries
+//
+// All interactions with the db are performed within transactions to ensure 
atomicity and transactional isolation
+// of catalog changes.
+func NewCatalog(name string, db *sql.DB, dialect SupportedDialect, props 
iceberg.Properties) (*Catalog, error) {
+       cat := &Catalog{db: bun.NewDB(db, getDialect(dialect)), name: name, 
props: props}
+
+       cat.db.AddQueryHook(bundebug.NewQueryHook(
+               bundebug.WithEnabled(false),
+               // ICEBERG_SQL_DEBUG=1 logs only failed queries
+               // ICEBERG_SQL_DEBUG=2 log all queries
+               bundebug.FromEnv("ICEBERG_SQL_DEBUG")))
+
+       if cat.props.GetBool(initCatalogTablesKey, true) {
+               return cat, cat.ensureTablesExist()
+       }
+
+       return cat, nil
+}
+
+func (c *Catalog) Name() string { return c.name }
+
+func (c *Catalog) CatalogType() catalog.Type {
+       return catalog.SQL
+}
+
+func (c *Catalog) CreateSQLTables(ctx context.Context) error {
+       _, err := c.db.NewCreateTable().Model((*sqlIcebergTable)(nil)).
+               IfNotExists().Exec(ctx)
+       if err != nil {
+               return err
+       }
+
+       _, err = c.db.NewCreateTable().Model((*sqlIcebergNamespaceProps)(nil)).
+               IfNotExists().Exec(ctx)
+       return err
+}
+
+func (c *Catalog) DropSQLTables(ctx context.Context) error {
+       _, err := c.db.NewDropTable().Model((*sqlIcebergTable)(nil)).
+               IfExists().Exec(ctx)
+       if err != nil {
+               return err
+       }
+
+       _, err = c.db.NewDropTable().Model((*sqlIcebergNamespaceProps)(nil)).
+               IfExists().Exec(ctx)
+       return err
+}
+
+func (c *Catalog) ensureTablesExist() error {
+       return c.CreateSQLTables(context.Background())
+}
+
+func (c *Catalog) namespaceExists(ctx context.Context, ns string) (bool, 
error) {
+       return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
(bool, error) {
+               exists, err := tx.NewSelect().Model((*sqlIcebergTable)(nil)).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", ns).
+                       Limit(1).Exists(ctx)
+               if err != nil {
+                       return false, err
+               }
+               if exists {
+                       return true, nil
+               }
+
+               return tx.NewSelect().Model((*sqlIcebergNamespaceProps)(nil)).
+                       Where("catalog_name = ?", c.name).Where("namespace = 
?", ns).
+                       Limit(1).Exists(ctx)
+       })
+}
+
+func (c *Catalog) getDefaultWarehouseLocation(ctx context.Context, nsname, 
tableName string) (string, error) {
+       dbprops, err := c.LoadNamespaceProperties(ctx, strings.Split(nsname, 
"."))
+       if err != nil {
+               return "", err
+       }
+
+       if dblocation := dbprops.Get("location", ""); dblocation != "" {
+               return path.Join(dblocation, tableName), nil
+       }
+
+       if warehousepath := c.props.Get("warehouse", ""); warehousepath != "" {
+               return warehousepath + "/" + path.Join(nsname+".db", 
tableName), nil
+       }
+
+       return "", errors.New("no default path set, please specify a location 
when creating a table")
+}
+
+func (c *Catalog) resolveTableLocation(ctx context.Context, loc, nsname, 
tablename string) (string, error) {
+       if len(loc) == 0 {
+               return c.getDefaultWarehouseLocation(ctx, nsname, tablename)
+       }
+
+       return strings.TrimSuffix(loc, "/"), nil
+}
+
+func checkValidNamespace(ident table.Identifier) error {
+       if len(ident) < 1 {
+               return fmt.Errorf("%w: empty namespace identifier", 
catalog.ErrNoSuchNamespace)
+       }
+       return nil
+}
+
+func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc 
*iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
+       var cfg internal.CreateTableCfg
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       nsIdent := catalog.NamespaceFromIdent(ident)
+       tblIdent := catalog.TableNameFromIdent(ident)
+       ns := strings.Join(nsIdent, ".")
+       exists, err := c.namespaceExists(ctx, ns)
+       if err != nil {
+               return nil, err
+       }
+
+       if !exists {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns)
+       }
+
+       loc, err := c.resolveTableLocation(ctx, cfg.Location, ns, tblIdent)
+       if err != nil {
+               return nil, err
+       }
+
+       metadataLocation := internal.GetMetadataLoc(loc, 0)
+       metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, 
cfg.SortOrder, loc, cfg.Properties)
+       if err != nil {
+               return nil, err
+       }
+
+       if err := internal.WriteMetadata(metadata, metadataLocation, c.props); 
err != nil {
+               return nil, err
+       }
+
+       err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error 
{
+               _, err := tx.NewInsert().Model(&sqlIcebergTable{
+                       CatalogName:      c.name,
+                       TableNamespace:   ns,
+                       TableName:        tblIdent,
+                       MetadataLocation: sql.NullString{String: 
metadataLocation, Valid: true},
+               }).Exec(ctx)
+
+               if err != nil {
+                       return fmt.Errorf("failed to create table: %w", err)
+               }
+               return nil
+       })
+
+       if err != nil {
+               return nil, err
+       }
+
+       return c.LoadTable(ctx, ident, cfg.Properties)
+}
+
+func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, reqs 
[]table.Requirement, updates []table.Update) (table.Metadata, string, error) {
+       panic("commit table not implemented for SQLCatalog")
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, 
props iceberg.Properties) (*table.Table, error) {
+       ns := catalog.NamespaceFromIdent(identifier)
+       tbl := catalog.TableNameFromIdent(identifier)
+
+       if props == nil {
+               props = iceberg.Properties{}
+       }
+
+       result, err := withReadTx(ctx, c.db, func(ctx context.Context, tx 
bun.Tx) (*sqlIcebergTable, error) {
+               t := new(sqlIcebergTable)
+               err := tx.NewSelect().Model(t).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", strings.Join(ns, ".")).
+                       Where("table_name = ?", tbl).
+                       Scan(ctx)
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, fmt.Errorf("%w: %s", 
catalog.ErrNoSuchTable, identifier)
+               }
+
+               if err != nil {
+                       return nil, fmt.Errorf("error encountered loading table 
%s: %w", identifier, err)
+               }
+               return t, nil
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       if !result.MetadataLocation.Valid {
+               return nil, fmt.Errorf("%w: %s, metadata location is missing", 
catalog.ErrNoSuchTable, identifier)
+       }
+
+       tblProps := maps.Clone(c.props)
+       maps.Copy(props, tblProps)
+
+       iofs, err := io.LoadFS(tblProps, result.MetadataLocation.String)
+       if err != nil {
+               return nil, err
+       }
+       return table.NewFromLocation(identifier, 
result.MetadataLocation.String, iofs)
+}
+
+func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) 
error {
+       ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+       tbl := catalog.TableNameFromIdent(identifier)
+
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               res, err := tx.NewDelete().Model(&sqlIcebergTable{
+                       CatalogName:    c.name,
+                       TableNamespace: ns,
+                       TableName:      tbl,
+               }).WherePK().Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("failed to delete table entry: %w", 
err)
+               }
+
+               n, err := res.RowsAffected()
+               if err != nil {
+                       return fmt.Errorf("error encountered when deleting 
table entry: %w", err)
+               }
+
+               if n == 0 {
+                       return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, 
identifier)
+               }
+               return nil
+       })
+}
+
+func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
+       fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".")
+       fromTbl := catalog.TableNameFromIdent(from)
+
+       toNs := strings.Join(catalog.NamespaceFromIdent(to), ".")
+       toTbl := catalog.TableNameFromIdent(to)
+
+       exists, err := c.namespaceExists(ctx, toNs)
+       if err != nil {
+               return nil, err
+       }
+       if !exists {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
toNs)
+       }
+
+       err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error 
{
+               exists, err := tx.NewSelect().Model(&sqlIcebergTable{
+                       CatalogName:    c.name,
+                       TableNamespace: toNs,
+                       TableName:      toTbl,
+               }).WherePK().Exists(ctx)
+               if err != nil {
+                       return fmt.Errorf("error encountered checking existence 
of table '%s': %w", to, err)
+               }
+
+               if exists {
+                       return catalog.ErrTableAlreadyExists
+               }
+
+               res, err := tx.NewUpdate().Model(&sqlIcebergTable{
+                       CatalogName:    c.name,
+                       TableNamespace: fromNs,
+                       TableName:      fromTbl,
+               }).WherePK().
+                       Set("table_namespace = ?", toNs).
+                       Set("table_name = ?", toTbl).
+                       Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("error renaming table from '%s' to 
%s': %w", from, to, err)
+               }
+
+               n, err := res.RowsAffected()
+               if err != nil {
+                       return fmt.Errorf("error renaming table from '%s' to 
%s': %w", from, to, err)
+               }
+
+               if n == 0 {
+                       return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, 
from)
+               }
+               return nil
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       return c.LoadTable(ctx, to, nil)
+}
+
+func (c *Catalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+       if err := checkValidNamespace(namespace); err != nil {
+               return err
+       }
+
+       exists, err := c.namespaceExists(ctx, strings.Join(namespace, "."))
+       if err != nil {
+               return err
+       }
+
+       if exists {
+               return fmt.Errorf("%w: %s", catalog.ErrNamespaceAlreadyExists, 
strings.Join(namespace, "."))
+       }
+
+       if len(props) == 0 {
+               props = minimalNamespaceProps
+       }
+
+       nsToCreate := strings.Join(namespace, ".")
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               toInsert := make([]sqlIcebergNamespaceProps, 0, len(props))
+               for k, v := range props {
+                       toInsert = append(toInsert, sqlIcebergNamespaceProps{
+                               CatalogName:   c.name,
+                               Namespace:     nsToCreate,
+                               PropertyKey:   k,
+                               PropertyValue: sql.NullString{String: v, Valid: 
true},
+                       })
+               }
+
+               _, err := tx.NewInsert().Model(&toInsert).Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("error inserting namespace properties 
for namespace '%s': %w", namespace, err)
+               }
+               return nil
+       })
+}
+
+func (c *Catalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+       if err := checkValidNamespace(namespace); err != nil {
+               return err
+       }
+
+       nsToDelete := strings.Join(namespace, ".")
+
+       exists, err := c.namespaceExists(ctx, nsToDelete)
+       if err != nil {
+               return err
+       }
+
+       if !exists {
+               return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
nsToDelete)
+       }
+
+       tbls, err := c.ListTables(ctx, namespace)
+       if err != nil {
+               return err
+       }
+
+       if len(tbls) > 0 {
+               return fmt.Errorf("%w: %d tables exist in namespace %s", 
catalog.ErrNamespaceNotEmpty, len(tbls), nsToDelete)
+       }
+
+       return withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
error {
+               _, err := 
tx.NewDelete().Model((*sqlIcebergNamespaceProps)(nil)).
+                       Where("catalog_name = ?", c.name).
+                       Where("namespace = ?", nsToDelete).Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("error deleting namespace '%s': %w", 
namespace, err)
+               }
+               return nil
+       })
+}
+
+func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
+       if err := checkValidNamespace(namespace); err != nil {
+               return nil, err
+       }
+
+       nsToLoad := strings.Join(namespace, ".")
+       exists, err := c.namespaceExists(ctx, nsToLoad)
+       if err != nil {
+               return nil, err
+       }
+
+       if !exists {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
nsToLoad)
+       }
+
+       return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
(iceberg.Properties, error) {
+               var props []sqlIcebergNamespaceProps
+               err := tx.NewSelect().Model(&props).
+                       Where("catalog_name = ?", c.name).
+                       Where("namespace = ?", nsToLoad).Scan(ctx)
+               if err != nil {
+                       return nil, fmt.Errorf("error loading namespace 
properties for '%s': %w", namespace, err)
+               }
+
+               result := make(iceberg.Properties)
+               for _, p := range props {
+                       result[p.PropertyKey] = p.PropertyValue.String
+               }
+               return result, nil
+       })
+}
+
+func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) 
([]table.Identifier, error) {
+       if len(namespace) > 0 {
+               exists, err := c.namespaceExists(ctx, strings.Join(namespace, 
"."))
+               if err != nil {
+                       return nil, err
+               }
+               if !exists {
+                       return nil, fmt.Errorf("%w: %s", 
catalog.ErrNoSuchNamespace, strings.Join(namespace, "."))
+               }
+       }
+
+       ns := strings.Join(namespace, ".")
+       tables, err := withReadTx(ctx, c.db, func(ctx context.Context, tx 
bun.Tx) ([]sqlIcebergTable, error) {
+               var tables []sqlIcebergTable
+               err := tx.NewSelect().Model(&tables).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", ns).
+                       Scan(ctx)
+               return tables, err
+       })
+       if err != nil {
+               return nil, fmt.Errorf("error listing tables for namespace 
'%s': %w", namespace, err)
+       }
+
+       ret := make([]table.Identifier, len(tables))
+       for i, t := range tables {
+               ret[i] = append(strings.Split(t.TableNamespace, "."), 
t.TableName)
+       }
+       return ret, nil
+}
+
+func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) 
([]table.Identifier, error) {
+       tableQuery := c.db.NewSelect().Model((*sqlIcebergTable)(nil)).
+               Column("table_namespace").Where("catalog_name = ?", c.name)
+       nsQuery := c.db.NewSelect().Model((*sqlIcebergNamespaceProps)(nil)).
+               Column("namespace").Where("catalog_name = ?", c.name)
+
+       if len(parent) > 0 {
+               ns := strings.Join(parent, ".")
+               exists, err := c.namespaceExists(ctx, ns)
+               if err != nil {
+                       return nil, err
+               }
+               if !exists {
+                       return nil, fmt.Errorf("%w: %s", 
catalog.ErrNoSuchNamespace, strings.Join(parent, "."))
+               }
+
+               ns += "%"
+               tableQuery = tableQuery.Where("table_namespace like ?", ns)
+               nsQuery = nsQuery.Where("namespace like ?", ns)
+       }
+
+       namespaces, err := withReadTx(ctx, c.db, func(ctx context.Context, tx 
bun.Tx) ([]string, error) {
+               var namespaces []string
+
+               rows, err := tx.QueryContext(ctx, tableQuery.String()+" UNION 
"+nsQuery.String())
+               if err != nil {
+                       return nil, fmt.Errorf("error listing namespaces for 
'%s': %w", parent, err)
+               }
+
+               err = c.db.ScanRows(ctx, rows, &namespaces)
+               return namespaces, err
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       ret := make([]table.Identifier, len(namespaces))
+       for i, n := range namespaces {
+               ret[i] = strings.Split(n, ".")
+       }
+       return ret, nil
+}
+
+// avoid circular dependency while still avoiding having to export the 
getUpdatedPropsAndUpdateSummary function
+// so that we can re-use it in the catalog implementations without duplicating 
the code.
+
+//go:linkname getUpdatedPropsAndUpdateSummary 
github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
catalog.PropertiesUpdateSummary, error)
+
+func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier, removals []string, updates iceberg.Properties) 
(catalog.PropertiesUpdateSummary, error) {
+       var summary catalog.PropertiesUpdateSummary
+       currentProps, err := c.LoadNamespaceProperties(ctx, namespace)
+       if err != nil {
+               return summary, err
+       }
+
+       _, summary, err = getUpdatedPropsAndUpdateSummary(currentProps, 
removals, updates)
+       if err != nil {
+               return summary, err
+       }
+
+       nsToUpdate := strings.Join(namespace, ".")
+       return summary, withWriteTx(ctx, c.db, func(ctx context.Context, tx 
bun.Tx) error {
+               var m *sqlIcebergNamespaceProps
+               if len(removals) > 0 {
+                       _, err := tx.NewDelete().Model(m).
+                               Where("catalog_name = ?", c.name).
+                               Where("namespace = ?", nsToUpdate).
+                               Where("property_key in (?)", 
bun.In(removals)).Exec(ctx)
+                       if err != nil {
+                               return fmt.Errorf("error deleting properties 
for '%s': %w", namespace, err)
+                       }
+               }
+
+               if len(updates) > 0 {
+                       props := make([]sqlIcebergNamespaceProps, 0, 
len(updates))
+                       for k, v := range updates {
+                               props = append(props, sqlIcebergNamespaceProps{
+                                       CatalogName:   c.name,
+                                       Namespace:     nsToUpdate,
+                                       PropertyKey:   k,
+                                       PropertyValue: sql.NullString{String: 
v, Valid: true},
+                               })
+                       }
+
+                       q := tx.NewInsert().Model(&props)
+                       switch {
+                       case c.db.HasFeature(feature.InsertOnConflict):
+                               q = q.On("CONFLICT (catalog_name, namespace, 
property_key) DO UPDATE").
+                                       Set("property_value = 
EXCLUDED.property_value")
+                       case c.db.HasFeature(feature.InsertOnDuplicateKey):
+                               q = q.On("DUPLICATE KEY UPDATE")
+                       default:
+                               _, err := tx.NewDelete().Model(m).
+                                       Where("catalog_name = ?", c.name).
+                                       Where("namespace = ?", nsToUpdate).
+                                       Where("property_key in (?)", 
bun.In(slices.Collect(maps.Keys(updates)))).
+                                       Exec(ctx)
+                               if err != nil {
+                                       return fmt.Errorf("error deleting 
properties for '%s': %w", namespace, err)
+                               }
+                       }
+
+                       _, err := q.Exec(ctx)
+                       if err != nil {
+                               return fmt.Errorf("error updating namespace 
properties for '%s': %w", namespace, err)
+                       }
+               }
+
+               return nil
+       })
+}
diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go
new file mode 100644
index 0000000..5e26c63
--- /dev/null
+++ b/catalog/sql/sql_test.go
@@ -0,0 +1,945 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sql_test
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "maps"
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       sqlcat "github.com/apache/iceberg-go/catalog/sql"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/suite"
+       "github.com/uptrace/bun/driver/sqliteshim"
+       "golang.org/x/exp/rand"
+)
+
+var (
+       tableSchemaNested = iceberg.NewSchemaWithIdentifiers(1,
+               []int{1},
+               iceberg.NestedField{
+                       ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.NestedField{
+                       ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, 
Required: true},
+               iceberg.NestedField{
+                       ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, 
Required: false},
+               iceberg.NestedField{
+                       ID: 4, Name: "qux", Required: true, Type: 
&iceberg.ListType{
+                               ElementID: 5, Element: 
iceberg.PrimitiveTypes.String, ElementRequired: true}},
+               iceberg.NestedField{
+                       ID: 6, Name: "quux",
+                       Type: &iceberg.MapType{
+                               KeyID:   7,
+                               KeyType: iceberg.PrimitiveTypes.String,
+                               ValueID: 8,
+                               ValueType: &iceberg.MapType{
+                                       KeyID:         9,
+                                       KeyType:       
iceberg.PrimitiveTypes.String,
+                                       ValueID:       10,
+                                       ValueType:     
iceberg.PrimitiveTypes.Int32,
+                                       ValueRequired: true,
+                               },
+                               ValueRequired: true,
+                       },
+                       Required: true},
+               iceberg.NestedField{
+                       ID: 11, Name: "location", Type: &iceberg.ListType{
+                               ElementID: 12, Element: &iceberg.StructType{
+                                       FieldList: []iceberg.NestedField{
+                                               {ID: 13, Name: "latitude", 
Type: iceberg.PrimitiveTypes.Float32, Required: false},
+                                               {ID: 14, Name: "longitude", 
Type: iceberg.PrimitiveTypes.Float32, Required: false},
+                                       },
+                               },
+                               ElementRequired: true},
+                       Required: true},
+               iceberg.NestedField{
+                       ID:   15,
+                       Name: "person",
+                       Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 16, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+                                       {ID: 17, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+                               },
+                       },
+                       Required: false,
+               },
+       )
+)
+
+func TestCreateSQLCatalogNoDriverDialect(t *testing.T) {
+       _, err := catalog.Load("sql", iceberg.Properties{})
+       assert.Error(t, err)
+
+       _, err = catalog.Load("sql", iceberg.Properties{sqlcat.DriverKey: 
"sqlite"})
+       assert.Error(t, err)
+}
+
+func TestInvalidDialect(t *testing.T) {
+       _, err := catalog.Load("sql", iceberg.Properties{
+               sqlcat.DriverKey:  sqliteshim.ShimName,
+               sqlcat.DialectKey: "foobar",
+       })
+       assert.Error(t, err)
+}
+
+func randomString(n int) string {
+       const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+       var b strings.Builder
+       b.Grow(n)
+       for range n {
+               b.WriteByte(letters[rand.Intn(len(letters))])
+       }
+       return b.String()
+}
+
+const randomLen = 20
+
+func databaseName() string {
+       return strings.ToLower("my-iceberg-db-" + randomString(randomLen))
+}
+
+func tableName() string {
+       return strings.ToLower("my-iceberg-table-" + randomString(randomLen))
+}
+
+func hiearchicalNamespaceName() string {
+       prefix := "my-iceberg-ns-"
+       tag1 := randomString(randomLen)
+       tag2 := randomString(randomLen)
+       return strings.Join([]string{prefix + tag1, prefix + tag2}, ".")
+}
+
+type SqliteCatalogTestSuite struct {
+       suite.Suite
+
+       warehouse string
+}
+
+func (s *SqliteCatalogTestSuite) randomTableIdentifier() table.Identifier {
+       dbname, tablename := databaseName(), tableName()
+       s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, 
dbname+".db", tablename, "metadata"), 0755))
+       return table.Identifier{dbname, tablename}
+}
+
+func (s *SqliteCatalogTestSuite) randomHierarchicalIdentifier() 
table.Identifier {
+       hierarchicalNsName, tableName := hiearchicalNamespaceName(), tableName()
+       s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, 
hierarchicalNsName+".db", tableName, "metadata"), 0755))
+       return strings.Split(hierarchicalNsName+"."+tableName, ".")
+}
+
+func (s *SqliteCatalogTestSuite) SetupTest() {
+       var err error
+       s.warehouse, err = os.MkdirTemp(os.TempDir(), "test_sql_*")
+       s.Require().NoError(err)
+}
+
+func (s *SqliteCatalogTestSuite) catalogUri() string {
+       return "file://" + filepath.Join(s.warehouse, "sql-catalog.db")
+}
+
+func (s *SqliteCatalogTestSuite) confirmNoTables(db *sql.DB) {
+       rows, err := db.Query("SELECT name FROM sqlite_master WHERE 
type='table'")
+       s.Require().NoError(err)
+       defer rows.Close()
+
+       var tables []string
+       for rows.Next() {
+               var table string
+               s.Require().NoError(rows.Scan(&table))
+               tables = append(tables, table)
+       }
+
+       s.NotContains(tables, "iceberg_tables")
+       s.NotContains(tables, "iceberg_namespace_properties")
+}
+
+func (s *SqliteCatalogTestSuite) confirmTablesExist(db *sql.DB) {
+       rows, err := db.Query("SELECT name FROM sqlite_master WHERE 
type='table'")
+       s.Require().NoError(err)
+       defer rows.Close()
+
+       var tables []string
+       for rows.Next() {
+               var table string
+               s.Require().NoError(rows.Scan(&table))
+               tables = append(tables, table)
+       }
+
+       s.Contains(tables, "iceberg_tables")
+       s.Contains(tables, "iceberg_namespace_properties")
+}
+
+func (s *SqliteCatalogTestSuite) loadCatalogForTableCreation() *sqlcat.Catalog 
{
+       cat, err := catalog.Load("default", iceberg.Properties{
+               "uri":                 s.catalogUri(),
+               sqlcat.DriverKey:      sqliteshim.ShimName,
+               sqlcat.DialectKey:     string(sqlcat.SQLite),
+               "type":                "sql",
+               "init_catalog_tables": "true",
+       })
+       s.Require().NoError(err)
+
+       return cat.(*sqlcat.Catalog)
+}
+
+func (s *SqliteCatalogTestSuite) TearDownTest() {
+       s.Require().NoError(os.RemoveAll(s.warehouse))
+}
+
+func (s *SqliteCatalogTestSuite) getDB() *sql.DB {
+       sqldb, err := sql.Open(sqliteshim.ShimName, s.catalogUri())
+       s.Require().NoError(err)
+
+       return sqldb
+}
+
+func (s *SqliteCatalogTestSuite) getCatalogMemory() *sqlcat.Catalog {
+       cat, err := catalog.Load("default", iceberg.Properties{
+               "uri":             ":memory:",
+               sqlcat.DriverKey:  sqliteshim.ShimName,
+               sqlcat.DialectKey: string(sqlcat.SQLite),
+               "type":            "sql",
+               "warehouse":       "file://" + s.warehouse,
+       })
+       s.Require().NoError(err)
+
+       return cat.(*sqlcat.Catalog)
+}
+
+func (s *SqliteCatalogTestSuite) getCatalogSqlite() *sqlcat.Catalog {
+       cat, err := catalog.Load("default", iceberg.Properties{
+               "uri":             s.catalogUri(),
+               sqlcat.DriverKey:  sqliteshim.ShimName,
+               sqlcat.DialectKey: string(sqlcat.SQLite),
+               "type":            "sql",
+               "warehouse":       "file://" + s.warehouse,
+       })
+       s.Require().NoError(err)
+
+       return cat.(*sqlcat.Catalog)
+}
+
+func (s *SqliteCatalogTestSuite) TestSqlCatalogType() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+       for _, cat := range catalogs {
+               s.Equal(catalog.SQL, cat.CatalogType())
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreationNoTablesExist() {
+       sqldb := s.getDB()
+       s.confirmNoTables(sqldb)
+
+       _ = s.loadCatalogForTableCreation()
+
+       s.confirmTablesExist(sqldb)
+}
+
+func (s *SqliteCatalogTestSuite) TestCreationOneTableExists() {
+       sqldb := s.getDB()
+       s.confirmNoTables(sqldb)
+
+       _, err := sqldb.Exec(`CREATE TABLE "iceberg_tables" (
+               "catalog_name" VARCHAR NOT NULL, 
+               "table_namespace" VARCHAR NOT NULL, 
+               "table_name" VARCHAR NOT NULL, 
+               "metadata_location" VARCHAR, 
+               "previous_metadata_location" VARCHAR, 
+               PRIMARY KEY ("catalog_name", "table_namespace", "table_name"))`)
+       s.Require().NoError(err)
+
+       _ = s.loadCatalogForTableCreation()
+
+       s.confirmTablesExist(sqldb)
+}
+
+func (s *SqliteCatalogTestSuite) TestCreationAllTablesExist() {
+       sqldb := s.getDB()
+       s.confirmNoTables(sqldb)
+
+       _, err := sqldb.Exec(`CREATE TABLE "iceberg_tables" (
+               "catalog_name" VARCHAR NOT NULL, 
+               "table_namespace" VARCHAR NOT NULL, 
+               "table_name" VARCHAR NOT NULL, 
+               "metadata_location" VARCHAR, 
+               "previous_metadata_location" VARCHAR, 
+               PRIMARY KEY ("catalog_name", "table_namespace", "table_name"))`)
+       s.Require().NoError(err)
+
+       _, err = sqldb.Exec(`CREATE TABLE "iceberg_namespace_properties" (
+               "catalog_name" VARCHAR NOT NULL, 
+               "namespace" VARCHAR NOT NULL, 
+               "property_key" VARCHAR NOT NULL, 
+               "property_value" VARCHAR, 
+               PRIMARY KEY ("catalog_name", "namespace", "property_key"))`)
+       s.Require().NoError(err)
+
+       _ = s.loadCatalogForTableCreation()
+
+       s.confirmTablesExist(sqldb)
+}
+
+func (s *SqliteCatalogTestSuite) TestDropSQLTablesIdempotency() {
+       sqldb := s.getDB()
+       s.confirmNoTables(sqldb)
+
+       cat := s.loadCatalogForTableCreation()
+       s.confirmTablesExist(sqldb)
+
+       s.NoError(cat.DropSQLTables(context.Background()))
+       s.confirmNoTables(sqldb)
+       s.NoError(cat.DropSQLTables(context.Background()))
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTablesIdempotency() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       ctx := context.Background()
+       for _, cat := range catalogs {
+               s.NoError(cat.CreateSQLTables(ctx))
+               s.NoError(cat.CreateSQLTables(ctx))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTableNonExistingNamespace() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       ctx := context.Background()
+       for _, cat := range catalogs {
+               _, err := cat.CreateTable(ctx, table.Identifier{"default", 
"non_existing_namespace"}, tableSchemaNested)
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTableDefaultSortOrder() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested)
+               s.Require().NoError(err)
+
+               s.FileExists(strings.TrimPrefix(tbl.MetadataLocation(), 
"file://"))
+
+               s.Equal(0, tbl.SortOrder().OrderID)
+               s.NoError(tt.cat.DropTable(context.Background(), tt.tblID))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateV1Table() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested,
+                       
catalog.WithProperties(iceberg.Properties{"format-version": "1"}))
+               s.Require().NoError(err)
+
+               s.FileExists(strings.TrimPrefix(tbl.MetadataLocation(), 
"file://"))
+               s.Equal(0, tbl.SortOrder().OrderID)
+               s.Equal(1, tbl.Metadata().Version())
+               s.True(tbl.Spec().Equals(*iceberg.UnpartitionedSpec))
+               s.NoError(tt.cat.DropTable(context.Background(), tt.tblID))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTableCustomSortOrder() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+               order := table.SortOrder{Fields: []table.SortField{
+                       {SourceID: 2, Transform: iceberg.IdentityTransform{}, 
NullOrder: table.NullsFirst},
+               }}
+
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested,
+                       catalog.WithSortOrder(order))
+               s.Require().NoError(err)
+
+               s.FileExists(strings.TrimPrefix(tbl.MetadataLocation(), 
"file://"))
+               s.Equal(1, tbl.SortOrder().OrderID)
+               s.Len(tbl.SortOrder().Fields, 1)
+               s.Equal(table.SortASC, tbl.SortOrder().Fields[0].Direction)
+               s.Equal(table.NullsFirst, tbl.SortOrder().Fields[0].NullOrder)
+               s.Equal("identity", 
tbl.SortOrder().Fields[0].Transform.String())
+               s.NoError(tt.cat.DropTable(context.Background(), tt.tblID))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateDuplicatedTable() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               _, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested,
+                       
catalog.WithProperties(iceberg.Properties{"format-version": "1"}))
+               s.Require().NoError(err)
+
+               _, err = tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested)
+               s.ErrorContains(err, "failed to create table")
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTableWithGivenLocation() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               tblName := catalog.TableNameFromIdent(tt.tblID)
+
+               location := fmt.Sprintf("file://%s/%s.db/%s-given",
+                       s.warehouse, tt.cat.Name(), tblName)
+
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
tableSchemaNested,
+                       catalog.WithLocation(location+"/"))
+               s.Require().NoError(err)
+
+               s.Equal(tt.tblID, tbl.Identifier())
+               s.True(strings.HasPrefix(tbl.MetadataLocation(), 
"file://"+s.warehouse))
+               s.FileExists(strings.TrimPrefix(tbl.MetadataLocation(), 
"file://"))
+               s.Equal(location, tbl.Location())
+               s.NoError(tt.cat.DropTable(context.Background(), tt.tblID))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateTableWithoutNamespace() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+       tblName := table.Identifier{tableName()}
+
+       for _, cat := range catalogs {
+               _, err := cat.CreateTable(context.Background(), tblName, 
tableSchemaNested)
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestListTables() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+       tbl1, tbl2 := s.randomTableIdentifier(), 
s.randomHierarchicalIdentifier()
+
+       for _, cat := range catalogs {
+               ctx := context.Background()
+               ns1, ns2 := catalog.NamespaceFromIdent(tbl1), 
catalog.NamespaceFromIdent(tbl2)
+               s.Require().NoError(cat.CreateNamespace(ctx, ns1, nil))
+               s.Require().NoError(cat.CreateNamespace(ctx, ns2, nil))
+
+               _, err := cat.CreateTable(ctx, tbl1, tableSchemaNested)
+               s.Require().NoError(err)
+               _, err = cat.CreateTable(ctx, tbl2, tableSchemaNested)
+               s.Require().NoError(err)
+
+               tables, err := cat.ListTables(ctx, ns1)
+               s.Require().NoError(err)
+               s.Len(tables, 1)
+               s.Equal(tbl1, tables[0])
+
+               tables, err = cat.ListTables(ctx, ns2)
+               s.Require().NoError(err)
+               s.Len(tables, 1)
+               s.Equal(tbl2, tables[0])
+
+               _, err = cat.ListTables(ctx, table.Identifier{"does_not_exist"})
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestListTablesMissingNamespace() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               _, err := tt.cat.ListNamespaces(ctx, tt.namespace)
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestListNamespaces() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+       tbl1, tbl2 := s.randomTableIdentifier(), 
s.randomHierarchicalIdentifier()
+
+       for _, cat := range catalogs {
+               ctx := context.Background()
+               ns1, ns2 := catalog.NamespaceFromIdent(tbl1), 
catalog.NamespaceFromIdent(tbl2)
+               s.Require().NoError(cat.CreateNamespace(ctx, ns1, nil))
+               s.Require().NoError(cat.CreateNamespace(ctx, ns2, nil))
+
+               nslist, err := cat.ListNamespaces(ctx, nil)
+               s.Require().NoError(err)
+               s.Len(nslist, 2)
+               s.Contains(nslist, ns1)
+               s.Contains(nslist, ns2)
+
+               ns, err := cat.ListNamespaces(ctx, ns1)
+               s.Require().NoError(err)
+               s.Len(ns, 1)
+               s.Equal(ns1, ns[0])
+
+               ns, err = cat.ListNamespaces(ctx, ns2)
+               s.Require().NoError(err)
+               s.Len(ns, 1)
+               s.Equal(ns2, ns[0])
+
+               _, err = cat.ListNamespaces(ctx, 
table.Identifier{"does_not_exist"})
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+
+               ns, err = cat.ListNamespaces(ctx, nil)
+               s.Require().NoError(err)
+               s.Len(ns, 2)
+               s.Equal(ns1, ns[0])
+               s.Equal(ns2, ns[1])
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestLoadTableFromSelfIdentifier() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               table, err := tt.cat.CreateTable(context.Background(), 
tt.tblID, tableSchemaNested)
+               s.Require().NoError(err)
+
+               intermediate, err := tt.cat.LoadTable(context.Background(), 
tt.tblID, nil)
+               s.Require().NoError(err)
+               s.Equal(intermediate.Identifier(), table.Identifier())
+
+               loaded, err := tt.cat.LoadTable(context.Background(), 
intermediate.Identifier(), nil)
+               s.Require().NoError(err)
+
+               s.Equal(table.Identifier(), loaded.Identifier())
+               s.Equal(table.MetadataLocation(), loaded.MetadataLocation())
+               s.True(table.Metadata().Equals(loaded.Metadata()))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestDropTable() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+               table, err := tt.cat.CreateTable(context.Background(), 
tt.tblID, tableSchemaNested)
+               s.Require().NoError(err)
+
+               s.Equal(tt.tblID, table.Identifier())
+               s.NoError(tt.cat.DropTable(context.Background(), tt.tblID))
+               _, err = tt.cat.LoadTable(context.Background(), tt.tblID, nil)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+
+               _, err = tt.cat.LoadTable(context.Background(), 
table.Identifier(), nil)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestLoadTableNotExists() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       for _, cat := range catalogs {
+               _, err := cat.LoadTable(context.Background(), 
s.randomTableIdentifier(), nil)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestLoadTableInvalidMetadata() {
+       sqldb := s.getDB()
+       cat := s.loadCatalogForTableCreation()
+
+       _, err := sqldb.Exec(`INSERT INTO iceberg_tables (catalog_name, 
table_namespace, table_name)
+                       VALUES ('default', 'default', 'invalid_metadata')`)
+       s.Require().NoError(err)
+
+       _, err = cat.LoadTable(context.Background(), 
table.Identifier{"default", "invalid_metadata"}, nil)
+       s.ErrorIs(err, catalog.ErrNoSuchTable)
+}
+
+func (s *SqliteCatalogTestSuite) TestDropTableNotExist() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       for _, cat := range catalogs {
+               err := cat.DropTable(context.Background(), 
s.randomTableIdentifier())
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestRenameTable() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               fromTable table.Identifier
+               toTable   table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier(), 
s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier(), 
s.randomHierarchicalIdentifier()},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               fromNs := catalog.NamespaceFromIdent(tt.fromTable)
+               toNs := catalog.NamespaceFromIdent(tt.toTable)
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, fromNs, nil))
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, toNs, nil))
+
+               table, err := tt.cat.CreateTable(context.Background(), 
tt.fromTable, tableSchemaNested)
+               s.Require().NoError(err)
+               s.Equal(tt.fromTable, table.Identifier())
+
+               renamed, err := tt.cat.RenameTable(ctx, tt.fromTable, 
tt.toTable)
+               s.Require().NoError(err)
+
+               s.Equal(tt.toTable, renamed.Identifier())
+               s.Equal(table.MetadataLocation(), renamed.MetadataLocation())
+
+               _, err = tt.cat.LoadTable(ctx, tt.fromTable, nil)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestRenameTableToExisting() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               fromTable table.Identifier
+               toTable   table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier(), 
s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier(), 
s.randomHierarchicalIdentifier()},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               fromNs := catalog.NamespaceFromIdent(tt.fromTable)
+               toNs := catalog.NamespaceFromIdent(tt.toTable)
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, fromNs, nil))
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, toNs, nil))
+
+               table, err := tt.cat.CreateTable(ctx, tt.fromTable, 
tableSchemaNested)
+               s.Require().NoError(err)
+               s.Equal(tt.fromTable, table.Identifier())
+
+               table2, err := tt.cat.CreateTable(ctx, tt.toTable, 
tableSchemaNested)
+               s.Require().NoError(err)
+               s.Equal(tt.toTable, table2.Identifier())
+
+               _, err = tt.cat.RenameTable(ctx, tt.fromTable, tt.toTable)
+               s.ErrorIs(err, catalog.ErrTableAlreadyExists)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestRenameMissingTable() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               fromTable table.Identifier
+               toTable   table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier(), 
s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier(), 
s.randomHierarchicalIdentifier()},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               toNs := catalog.NamespaceFromIdent(tt.toTable)
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, toNs, nil))
+
+               _, err := tt.cat.RenameTable(ctx, tt.fromTable, tt.toTable)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestRenameToMissingNamespace() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               fromTable table.Identifier
+               toTable   table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier(), 
s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier(), 
s.randomHierarchicalIdentifier()},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               fromNs := catalog.NamespaceFromIdent(tt.fromTable)
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, fromNs, nil))
+
+               table, err := tt.cat.CreateTable(ctx, tt.fromTable, 
tableSchemaNested)
+               s.Require().NoError(err)
+               s.Equal(tt.fromTable, table.Identifier())
+
+               _, err = tt.cat.RenameTable(ctx, tt.fromTable, tt.toTable)
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateDuplicateNamespace() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
nil))
+               err := tt.cat.CreateNamespace(ctx, tt.namespace, nil)
+               s.ErrorIs(err, catalog.ErrNamespaceAlreadyExists)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateNamespaceSharingPrefix() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, 
append(tt.namespace, "_1"), nil))
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
nil))
+               tt.namespace[len(tt.namespace)-1] = 
tt.namespace[len(tt.namespace)-1] + "_1"
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
nil))
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestCreateaNamespaceWithCommentAndLocation() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               testLocation := "/test/location"
+               testProps := iceberg.Properties{
+                       "comment":  "this is a test description",
+                       "location": testLocation,
+               }
+
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
testProps))
+               loadedList, err := tt.cat.ListNamespaces(ctx, nil)
+               s.Require().NoError(err)
+               s.Len(loadedList, 1)
+               s.Equal(tt.namespace, loadedList[0])
+
+               props, err := tt.cat.LoadNamespaceProperties(ctx, tt.namespace)
+               s.Require().NoError(err)
+
+               s.Equal(testProps["comment"], props["comment"])
+               s.Equal(testProps["location"], props["location"])
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestDropNamespace() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, ns, nil))
+
+               nslist, err := tt.cat.ListNamespaces(ctx, nil)
+               s.Require().NoError(err)
+               s.Len(nslist, 1)
+               s.Equal(ns, nslist[0])
+
+               _, err = tt.cat.CreateTable(ctx, tt.tblID, tableSchemaNested)
+               s.Require().NoError(err)
+
+               err = tt.cat.DropNamespace(ctx, ns)
+               s.ErrorIs(err, catalog.ErrNamespaceNotEmpty)
+
+               s.Require().NoError(tt.cat.DropTable(ctx, tt.tblID))
+               s.Require().NoError(tt.cat.DropNamespace(ctx, ns))
+
+               nslist, err = tt.cat.ListNamespaces(ctx, nil)
+               s.Require().NoError(err)
+               s.Empty(nslist)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestDropNamespaceNotExist() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               err := tt.cat.DropNamespace(ctx, tt.namespace)
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestLoadEmptyNamespaceProperties() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
nil))
+               props, err := tt.cat.LoadNamespaceProperties(ctx, tt.namespace)
+               s.Require().NoError(err)
+               s.Equal(iceberg.Properties{"exists": "true"}, props)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestLoadNamespacePropertiesNonExisting() {
+       catalogs := []*sqlcat.Catalog{s.getCatalogMemory(), 
s.getCatalogSqlite()}
+
+       for _, cat := range catalogs {
+               _, err := cat.LoadNamespaceProperties(context.Background(), 
table.Identifier{"does_not_exist"})
+               s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestUpdateNamespaceProperties() {
+       tests := []struct {
+               cat       *sqlcat.Catalog
+               namespace table.Identifier
+       }{
+               {s.getCatalogMemory(), table.Identifier{databaseName()}},
+               {s.getCatalogSqlite(), 
strings.Split(hiearchicalNamespaceName(), ".")},
+       }
+
+       ctx := context.Background()
+       for _, tt := range tests {
+               warehouseLocation := "/test/location"
+               testProps := iceberg.Properties{
+                       "comment":        "this is a test description",
+                       "location":       warehouseLocation,
+                       "test_property1": "1",
+                       "test_property2": "2",
+                       "test_property3": "3",
+               }
+
+               removals := []string{"test_property1", "test_property2", 
"test_property3", "should_not_be_removed"}
+               updates := iceberg.Properties{
+                       "test_property4": "4",
+                       "test_property5": "5",
+                       "comment":        "updated test description",
+               }
+
+               s.Require().NoError(tt.cat.CreateNamespace(ctx, tt.namespace, 
testProps))
+               updateReport, err := tt.cat.UpdateNamespaceProperties(ctx, 
tt.namespace, removals, updates)
+               s.Require().NoError(err)
+
+               for k := range maps.Keys(updates) {
+                       s.Contains(updateReport.Updated, k)
+               }
+
+               for _, k := range removals {
+                       if k == "should_not_be_removed" {
+                               s.Contains(updateReport.Missing, k)
+                       } else {
+                               s.Contains(updateReport.Removed, k)
+                       }
+               }
+
+               props, err := tt.cat.LoadNamespaceProperties(ctx, tt.namespace)
+               s.Require().NoError(err)
+               s.Equal(iceberg.Properties{
+                       "comment":        "updated test description",
+                       "location":       warehouseLocation,
+                       "test_property4": "4",
+                       "test_property5": "5",
+               }, props)
+       }
+}
+
+func TestSqlCatalog(t *testing.T) {
+       suite.Run(t, new(SqliteCatalogTestSuite))
+}
diff --git a/go.mod b/go.mod
index c4b293f..2b74eea 100644
--- a/go.mod
+++ b/go.mod
@@ -36,7 +36,16 @@ require (
        github.com/stretchr/testify v1.10.0
        github.com/substrait-io/substrait-go/v3 v3.5.0
        github.com/twmb/murmur3 v1.1.8
+       github.com/uptrace/bun v1.2.8
+       github.com/uptrace/bun/dialect/mssqldialect v1.2.8
+       github.com/uptrace/bun/dialect/mysqldialect v1.2.8
+       github.com/uptrace/bun/dialect/oracledialect v1.2.8
+       github.com/uptrace/bun/dialect/pgdialect v1.2.8
+       github.com/uptrace/bun/dialect/sqlitedialect v1.2.8
+       github.com/uptrace/bun/driver/sqliteshim v1.2.8
+       github.com/uptrace/bun/extra/bundebug v1.2.8
        gocloud.dev v0.40.0
+       golang.org/x/exp v0.0.0-20250103183323-7d7fa50e5329
        golang.org/x/sync v0.10.0
        google.golang.org/api v0.218.0
        gopkg.in/yaml.v3 v3.0.1
@@ -75,7 +84,8 @@ require (
        github.com/containerd/console v1.0.3 // indirect
        github.com/creasty/defaults v1.8.0 // indirect
        github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/fatih/color v1.15.0 // indirect
+       github.com/dustin/go-humanize v1.0.1 // indirect
+       github.com/fatih/color v1.18.0 // indirect
        github.com/felixge/httpsnoop v1.0.4 // indirect
        github.com/go-logr/logr v1.4.2 // indirect
        github.com/go-logr/stdr v1.2.2 // indirect
@@ -89,6 +99,8 @@ require (
        github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
        github.com/googleapis/gax-go/v2 v2.14.1 // indirect
        github.com/gookit/color v1.5.4 // indirect
+       github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
+       github.com/jinzhu/inflection v1.0.0 // indirect
        github.com/jmespath/go-jmespath v0.4.0 // indirect
        github.com/json-iterator/go v1.1.12 // indirect
        github.com/klauspost/asmfmt v1.3.2 // indirect
@@ -98,16 +110,23 @@ require (
        github.com/mattn/go-colorable v0.1.13 // indirect
        github.com/mattn/go-isatty v0.0.20 // indirect
        github.com/mattn/go-runewidth v0.0.16 // indirect
+       github.com/mattn/go-sqlite3 v1.14.24 // indirect
        github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // 
indirect
        github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
        github.com/mitchellh/mapstructure v1.5.0 // indirect
        github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
        github.com/modern-go/reflect2 v1.0.2 // indirect
+       github.com/ncruces/go-strftime v0.1.9 // indirect
        github.com/pierrec/lz4/v4 v4.1.22 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
+       github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
+       github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // 
indirect
        github.com/rivo/uniseg v0.4.7 // indirect
        github.com/stretchr/objx v0.5.2 // indirect
        github.com/substrait-io/substrait v0.63.1 // indirect
+       github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
+       github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
+       github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
        github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
        github.com/zeebo/xxh3 v1.0.2 // indirect
        go.opencensus.io v0.24.0 // indirect
@@ -117,7 +136,6 @@ require (
        go.opentelemetry.io/otel/metric v1.31.0 // indirect
        go.opentelemetry.io/otel/trace v1.31.0 // indirect
        golang.org/x/crypto v0.32.0 // indirect
-       golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
        golang.org/x/mod v0.22.0 // indirect
        golang.org/x/net v0.34.0 // indirect
        golang.org/x/oauth2 v0.25.0 // indirect
@@ -132,4 +150,11 @@ require (
        google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250115164207-1a7da9e5054f // indirect
        google.golang.org/grpc v1.69.4 // indirect
        google.golang.org/protobuf v1.36.3 // indirect
+       modernc.org/gc/v3 v3.0.0-20250105121824-520be1a3aee6 // indirect
+       modernc.org/libc v1.61.6 // indirect
+       modernc.org/mathutil v1.7.1 // indirect
+       modernc.org/memory v1.8.1 // indirect
+       modernc.org/sqlite v1.34.4 // indirect
+       modernc.org/strutil v1.2.1 // indirect
+       modernc.org/token v1.1.0 // indirect
 )
diff --git a/go.sum b/go.sum
index db772ca..d91f228 100644
--- a/go.sum
+++ b/go.sum
@@ -98,12 +98,14 @@ github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 
h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod 
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/dustin/go-humanize v1.0.1 
h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
+github.com/dustin/go-humanize v1.0.1/go.mod 
h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod 
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane 
v0.9.1-0.20191026205805-5f8ba28d4473/go.mod 
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.4/go.mod 
h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
-github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
-github.com/fatih/color v1.15.0/go.mod 
h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
+github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
+github.com/fatih/color v1.18.0/go.mod 
h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
 github.com/felixge/httpsnoop v1.0.4 
h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
 github.com/felixge/httpsnoop v1.0.4/go.mod 
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/go-logr/logr v1.2.2/go.mod 
h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@@ -156,6 +158,8 @@ github.com/google/go-replayers/httpreplay v1.2.0/go.mod 
h1:WahEFFZZ7a1P4VM1qEeHy
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian/v3 v3.3.3 
h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc=
 github.com/google/martian/v3 v3.3.3/go.mod 
h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0=
+github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd 
h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
+github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod 
h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
 github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
 github.com/google/s2a-go v0.1.9/go.mod 
h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
 github.com/google/subcommands v1.2.0/go.mod 
h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
@@ -174,6 +178,10 @@ github.com/gookit/color v1.5.4 
h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
 github.com/gookit/color v1.5.4/go.mod 
h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w=
 github.com/hamba/avro/v2 v2.27.0 
h1:IAM4lQ0VzUIKBuo4qlAiLKfqALSrFC+zi1iseTtbBKU=
 github.com/hamba/avro/v2 v2.27.0/go.mod 
h1:jN209lopfllfrz7IGoZErlDz+AyUJ3vrBePQFZwYf5I=
+github.com/hashicorp/golang-lru/v2 v2.0.7 
h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod 
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
+github.com/jinzhu/inflection v1.0.0 
h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod 
h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
 github.com/jmespath/go-jmespath v0.4.0 
h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
 github.com/jmespath/go-jmespath v0.4.0/go.mod 
h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
 github.com/jmespath/go-jmespath/internal/testify v1.5.1 
h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
@@ -210,6 +218,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod 
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
 github.com/mattn/go-runewidth v0.0.13/go.mod 
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
 github.com/mattn/go-runewidth v0.0.16 
h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
 github.com/mattn/go-runewidth v0.0.16/go.mod 
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
+github.com/mattn/go-sqlite3 v1.14.24 
h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
+github.com/mattn/go-sqlite3 v1.14.24/go.mod 
h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
 github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 
h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
 github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod 
h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 
h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
@@ -221,6 +231,8 @@ github.com/modern-go/concurrent 
v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.2 
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod 
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/ncruces/go-strftime v0.1.9 
h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
+github.com/ncruces/go-strftime v0.1.9/go.mod 
h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
 github.com/pierrec/lz4/v4 v4.1.22 
h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
 github.com/pierrec/lz4/v4 v4.1.22/go.mod 
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -235,6 +247,10 @@ github.com/pterm/pterm v0.12.36/go.mod 
h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5b
 github.com/pterm/pterm v0.12.40/go.mod 
h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s=
 github.com/pterm/pterm v0.12.80 h1:mM55B+GnKUnLMUSqhdINe4s6tOuVQIetQ3my8JGyAIg=
 github.com/pterm/pterm v0.12.80/go.mod 
h1:c6DeF9bSnOSeFPZlfs4ZRAFcf5SCoTwvwQ5xaKGQlHo=
+github.com/puzpuzpuz/xsync/v3 v3.4.0 
h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
+github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod 
h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec 
h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod 
h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/rivo/uniseg v0.2.0/go.mod 
h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
 github.com/rivo/uniseg v0.4.7/go.mod 
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
@@ -260,8 +276,30 @@ github.com/substrait-io/substrait v0.63.1 
h1:XNPvrEYNPjDqenK4TxqBDDUNzglafdjzjej
 github.com/substrait-io/substrait v0.63.1/go.mod 
h1:MPFNw6sToJgpD5Z2rj0rQrdP/Oq8HG7Z2t3CAEHtkHw=
 github.com/substrait-io/substrait-go/v3 v3.5.0 
h1:sQqMGtXDzDC3dyCBL0+efnM/E5UbmDVZlOCHdnx+F+k=
 github.com/substrait-io/substrait-go/v3 v3.5.0/go.mod 
h1:S+yTQwCuWtTe5SD24JJ9S8x8Oae8h9rVDCpZlS9EV5A=
+github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc 
h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo=
+github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod 
h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs=
 github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
 github.com/twmb/murmur3 v1.1.8/go.mod 
h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
+github.com/uptrace/bun v1.2.8 h1:HEiLvy9wc7ehU5S02+O6NdV5BLz48lL4REPhTkMX3Dg=
+github.com/uptrace/bun v1.2.8/go.mod 
h1:JBq0uBKsKqNT0Ccce1IAFZY337Wkf08c6F6qlmfOHE8=
+github.com/uptrace/bun/dialect/mssqldialect v1.2.8 
h1:rNvhiMpOTrCub2E2e4eiAai/mNkWrfF6CrZnXJ2HQ58=
+github.com/uptrace/bun/dialect/mssqldialect v1.2.8/go.mod 
h1:08TBgE135eqYpl0PEA78D7gcS0Hxu74Onmo/SIApPkU=
+github.com/uptrace/bun/dialect/mysqldialect v1.2.8 
h1:0OBLIbxptPwXdkjDk+PNAyNKhs8d1m3QLNg5i+VIkl4=
+github.com/uptrace/bun/dialect/mysqldialect v1.2.8/go.mod 
h1:P/eOt7YkTM1k6ik/s9/jXv6QW9mljx3TVNLyrXgcikM=
+github.com/uptrace/bun/dialect/oracledialect v1.2.8 
h1:sKqhHZwp0zEiS49NBqk1OYiO9Puju9w7FJaKO1FaiqM=
+github.com/uptrace/bun/dialect/oracledialect v1.2.8/go.mod 
h1:prMyvaCvbP0ZDRLtjyHCOLk8hQrDRpjD+2gkyc9BBAc=
+github.com/uptrace/bun/dialect/pgdialect v1.2.8 
h1:9n3qVh6yc+u7F3lpXzsWrAFJG1yLHUC2thjCCVEDpM8=
+github.com/uptrace/bun/dialect/pgdialect v1.2.8/go.mod 
h1:plksD43MjAlPGYLD9/SzsLUpGH5poXE9IB1+ka/sEzE=
+github.com/uptrace/bun/dialect/sqlitedialect v1.2.8 
h1:Huqw7YhLFTbocbSv8NETYYXqKtwLa6XsciCWtjzWSWU=
+github.com/uptrace/bun/dialect/sqlitedialect v1.2.8/go.mod 
h1:ni7h2uwIc5zPhxgmCMTEbefONc4XsVr/ATfz1Q7d3CE=
+github.com/uptrace/bun/driver/sqliteshim v1.2.8 
h1:J7L/cSei7RAP0kjmpjekQHVvwl86VgNFVM0QONdE22w=
+github.com/uptrace/bun/driver/sqliteshim v1.2.8/go.mod 
h1:/JehwIy8P0fmgbOz3r6guEgg1Uxh18ScaOaVtnfEVvc=
+github.com/uptrace/bun/extra/bundebug v1.2.8 
h1:Epv0ycLOnoKWPky+rufP2F/PrcSlKkd4tmVIFOdq90A=
+github.com/uptrace/bun/extra/bundebug v1.2.8/go.mod 
h1:ucnmuPw/5ePbNFj2SPmV0lQh3ZvL+3HCrpvRxIYZyWQ=
+github.com/vmihailenco/msgpack/v5 v5.4.1 
h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
+github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod 
h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
+github.com/vmihailenco/tagparser/v2 v2.0.0 
h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
+github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod 
h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
 github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod 
h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
 github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e 
h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
 github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod 
h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
@@ -298,8 +336,8 @@ golang.org/x/crypto v0.18.0/go.mod 
h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
 golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
 golang.org/x/crypto v0.32.0/go.mod 
h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
-golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 
h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
-golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod 
h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
+golang.org/x/exp v0.0.0-20250103183323-7d7fa50e5329 
h1:9kj3STMvgqy3YA4VQXBrN7925ICMxD5wzMRcgA30588=
+golang.org/x/exp v0.0.0-20250103183323-7d7fa50e5329/go.mod 
h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod 
h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
 golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod 
h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod 
h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -440,3 +478,29 @@ gopkg.in/yaml.v3 v3.0.1 
h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+modernc.org/cc/v4 v4.24.2 h1:uektamHbSXU7egelXcyVpMaaAsrRH4/+uMKUQAQUdOw=
+modernc.org/cc/v4 v4.24.2/go.mod 
h1:T1lKJZhXIi2VSqGBiB4LIbKs9NsKTbUXj4IDrmGqtTI=
+modernc.org/ccgo/v4 v4.23.5 h1:6uAwu8u3pnla3l/+UVUrDDO1HIGxHTYmFH6w+X9nsyw=
+modernc.org/ccgo/v4 v4.23.5/go.mod 
h1:FogrWfBdzqLWm1ku6cfr4IzEFouq2fSAPf6aSAHdAJQ=
+modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
+modernc.org/fileutil v1.3.0/go.mod 
h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
+modernc.org/gc/v2 v2.6.0 h1:Tiw3pezQj7PfV8k4Dzyu/vhRHR2e92kOXtTFU8pbCl4=
+modernc.org/gc/v2 v2.6.0/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
+modernc.org/gc/v3 v3.0.0-20250105121824-520be1a3aee6 
h1:JoKwHjIFumiKrjMbp1cNbC5E9UyCgA/ZcID0xOWQ2N8=
+modernc.org/gc/v3 v3.0.0-20250105121824-520be1a3aee6/go.mod 
h1:LG5UO1Ran4OO0JRKz2oNiXhR5nNrgz0PzH7UKhz0aMU=
+modernc.org/libc v1.61.6 h1:L2jW0wxHPCyHK0YSHaGaVlY0WxjpG/TTVdg6gRJOPqw=
+modernc.org/libc v1.61.6/go.mod h1:G+DzuaCcReUYYg4nNSfigIfTDCENdj9EByglvaRx53A=
+modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
+modernc.org/mathutil v1.7.1/go.mod 
h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
+modernc.org/memory v1.8.1 h1:HS1HRg1jEohnuONobEq2WrLEhLyw8+J42yLFTnllm2A=
+modernc.org/memory v1.8.1/go.mod 
h1:ZbjSvMO5NQ1A2i3bWeDiVMxIorXwdClKE/0SZ+BMotU=
+modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
+modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
+modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
+modernc.org/sortutil v1.2.0/go.mod 
h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
+modernc.org/sqlite v1.34.4 h1:sjdARozcL5KJBvYQvLlZEmctRgW9xqIZc2ncN7PU0P8=
+modernc.org/sqlite v1.34.4/go.mod 
h1:3QQFCG2SEMtc2nv+Wq4cQCH7Hjcg+p/RMlS1XK+zwbk=
+modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
+modernc.org/strutil v1.2.1/go.mod 
h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
+modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
+modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
diff --git a/types.go b/types.go
index 37723e8..cc9f1b0 100644
--- a/types.go
+++ b/types.go
@@ -44,6 +44,17 @@ func (p Properties) Get(key, defVal string) string {
        return defVal
 }
 
+func (p Properties) GetBool(key string, defVal bool) bool {
+       if v, ok := p[key]; ok {
+               b, err := strconv.ParseBool(v)
+               if err != nil {
+                       return defVal
+               }
+               return b
+       }
+       return defVal
+}
+
 // Type is an interface representing any of the available iceberg types,
 // such as primitives (int32/int64/etc.) or nested types (list/struct/map).
 type Type interface {

Reply via email to