This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9927572f6 Python: Add REST catalog implementation (#5287)
9927572f6 is described below

commit 9927572f648fd7c353793949c030dc388fa28645
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sat Jul 30 21:40:53 2022 +0200

    Python: Add REST catalog implementation (#5287)
    
    This does not implement table commits, but does implement the catalog 
portion of the REST API.
---
 .github/workflows/open-api.yml               |   2 +-
 python/.pre-commit-config.yaml               |   2 +-
 python/poetry.lock                           | 256 +++++------
 python/pyiceberg/catalog/base.py             |  19 +-
 python/pyiceberg/catalog/rest.py             | 434 +++++++++++++++++++
 python/pyiceberg/exceptions.py               |  48 ++-
 python/pyiceberg/schema.py                   |   2 +
 python/pyiceberg/table/base.py               |  17 +-
 python/pyiceberg/table/metadata.py           |  19 +-
 python/pyiceberg/table/partitioning.py       |   5 +-
 python/pyiceberg/utils/iceberg_base_model.py |   1 +
 python/pyproject.toml                        |   7 +
 python/tests/catalog/test_base.py            |  39 +-
 python/tests/catalog/test_rest.py            | 612 +++++++++++++++++++++++++++
 python/tests/conftest.py                     |   2 +-
 python/tests/table/test_metadata.py          |  56 ++-
 python/tests/test_schema.py                  |   4 +-
 python/tests/utils/test_bin_packing.py       |  15 -
 18 files changed, 1323 insertions(+), 217 deletions(-)

diff --git a/.github/workflows/open-api.yml b/.github/workflows/open-api.yml
index e1703ce47..404eb13c2 100644
--- a/.github/workflows/open-api.yml
+++ b/.github/workflows/open-api.yml
@@ -45,4 +45,4 @@ jobs:
           python-version: 3.9
       - name: Install
         working-directory: ./open-api
-        run: pip install openapi-spec-validator && openapi-spec-validator 
rest-catalog-open-api.yaml
\ No newline at end of file
+        run: pip install openapi-spec-validator && openapi-spec-validator 
rest-catalog-open-api.yaml
diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml
index 9a219be8c..717b42978 100644
--- a/python/.pre-commit-config.yaml
+++ b/python/.pre-commit-config.yaml
@@ -40,7 +40,7 @@ repos:
     rev: v0.971
     hooks:
       - id: mypy
-        args: [--config=python/pyproject.toml]
+        args: [ --install-types, --non-interactive, 
--config=python/pyproject.toml]
   - repo: https://github.com/hadialqattan/pycln
     rev: v2.0.4
     hooks:
diff --git a/python/poetry.lock b/python/poetry.lock
index 2296e0362..d95bd8729 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -20,6 +20,14 @@ docs = ["furo", "sphinx", "zope.interface", 
"sphinx-notfound-page"]
 tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest 
(>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface", 
"cloudpickle"]
 tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest 
(>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "cloudpickle"]
 
+[[package]]
+name = "certifi"
+version = "2022.6.15"
+description = "Python package for providing Mozilla's CA Bundle."
+category = "main"
+optional = false
+python-versions = ">=3.6"
+
 [[package]]
 name = "cffi"
 version = "1.15.1"
@@ -39,6 +47,17 @@ category = "dev"
 optional = false
 python-versions = ">=3.6.1"
 
+[[package]]
+name = "charset-normalizer"
+version = "2.1.0"
+description = "The Real First Universal Charset Detector. Open, modern and 
actively maintained alternative to Chardet."
+category = "main"
+optional = false
+python-versions = ">=3.6.0"
+
+[package.extras]
+unicode_backport = ["unicodedata2"]
+
 [[package]]
 name = "colorama"
 version = "0.4.5"
@@ -114,6 +133,14 @@ python-versions = ">=3.7"
 [package.extras]
 license = ["ukkonen"]
 
+[[package]]
+name = "idna"
+version = "3.3"
+description = "Internationalized Domain Names in Applications (IDNA)"
+category = "main"
+optional = false
+python-versions = ">=3.5"
+
 [[package]]
 name = "importlib-metadata"
 version = "4.12.0"
@@ -205,8 +232,8 @@ optional = false
 python-versions = ">=3.6"
 
 [package.extras]
-testing = ["pytest-benchmark", "pytest"]
-dev = ["tox", "pre-commit"]
+dev = ["pre-commit", "tox"]
+testing = ["pytest", "pytest-benchmark"]
 
 [[package]]
 name = "pre-commit"
@@ -331,6 +358,40 @@ category = "dev"
 optional = false
 python-versions = ">=3.6"
 
+[[package]]
+name = "requests"
+version = "2.28.1"
+description = "Python HTTP for Humans."
+category = "main"
+optional = false
+python-versions = ">=3.7, <4"
+
+[package.dependencies]
+certifi = ">=2017.4.17"
+charset-normalizer = ">=2,<3"
+idna = ">=2.5,<4"
+urllib3 = ">=1.21.1,<1.27"
+
+[package.extras]
+socks = ["PySocks (>=1.5.6,!=1.5.7)"]
+use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"]
+
+[[package]]
+name = "requests-mock"
+version = "1.9.3"
+description = "Mock out responses from the requests package"
+category = "dev"
+optional = false
+python-versions = "*"
+
+[package.dependencies]
+requests = ">=2.3,<3"
+six = "*"
+
+[package.extras]
+fixture = ["fixtures"]
+test = ["fixtures", "mock", "purl", "pytest", "sphinx", "testrepository 
(>=0.0.18)", "testtools"]
+
 [[package]]
 name = "six"
 version = "1.16.0"
@@ -363,6 +424,19 @@ category = "main"
 optional = false
 python-versions = ">=3.7"
 
+[[package]]
+name = "urllib3"
+version = "1.26.10"
+description = "HTTP library with thread-safe connection pooling, file post, 
and more."
+category = "main"
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, 
!=3.5.*, <4"
+
+[package.extras]
+brotli = ["brotlicffi (>=0.8.0)", "brotli (>=1.0.9)", "brotlipy (>=0.6.0)"]
+secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", 
"certifi", "ipaddress"]
+socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
+
 [[package]]
 name = "virtualenv"
 version = "20.15.1"
@@ -415,16 +489,18 @@ snappy = ["python-snappy"]
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.8"
-content-hash = 
"906929f3e98c56eb63e4175d071dd366746751e9357a45d18a223d68ece38498"
+content-hash = 
"ff76c4a6349ee3a909543aa6587ace0ab6795e08e97e892a90553b6fd6a589af"
 
 [metadata.files]
-atomicwrites = [
-    {file = "atomicwrites-1.4.1.tar.gz", hash = 
"sha256:81b2c9071a49367a7f770170e5eec8cb66567cfbbc8c73d20ce5ca4a8d71cf11"},
-]
+atomicwrites = []
 attrs = [
     {file = "attrs-21.4.0-py2.py3-none-any.whl", hash = 
"sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"},
     {file = "attrs-21.4.0.tar.gz", hash = 
"sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
 ]
+certifi = [
+    {file = "certifi-2022.6.15-py3-none-any.whl", hash = 
"sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"},
+    {file = "certifi-2022.6.15.tar.gz", hash = 
"sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"},
+]
 cffi = [
     {file = "cffi-1.15.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = 
"sha256:a66d3508133af6e8548451b25058d5812812ec3798c886bf38ed24a98216fab2"},
     {file = "cffi-1.15.1-cp27-cp27m-manylinux1_i686.whl", hash = 
"sha256:470c103ae716238bbe698d67ad020e1db9d9dba34fa5a899b5e21577e6d52ed2"},
@@ -495,87 +571,29 @@ cfgv = [
     {file = "cfgv-3.3.1-py2.py3-none-any.whl", hash = 
"sha256:c6a0883f3917a037485059700b9e75da2464e6c27051014ad85ba6aaa5884426"},
     {file = "cfgv-3.3.1.tar.gz", hash = 
"sha256:f5a830efb9ce7a445376bb66ec94c638a9787422f96264c98edc6bdeed8ab736"},
 ]
+charset-normalizer = [
+    {file = "charset-normalizer-2.1.0.tar.gz", hash = 
"sha256:575e708016ff3a5e3681541cb9d79312c416835686d054a23accb873b254f413"},
+    {file = "charset_normalizer-2.1.0-py3-none-any.whl", hash = 
"sha256:5189b6f22b01957427f35b6a08d9a0bc45b46d3788ef5a92e978433c7a35f8a5"},
+]
 colorama = [
     {file = "colorama-0.4.5-py2.py3-none-any.whl", hash = 
"sha256:854bf444933e37f5824ae7bfc1e98d5bce2ebe4160d46b5edf346a89358e99da"},
     {file = "colorama-0.4.5.tar.gz", hash = 
"sha256:e6c6b4334fc50988a639d9b98aa429a0b57da6e17b9a44f0451f930b6967b7a4"},
 ]
-coverage = [
-    {file = "coverage-6.4.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = 
"sha256:a9032f9b7d38bdf882ac9f66ebde3afb8145f0d4c24b2e600bc4c6304aafb87e"},
-    {file = "coverage-6.4.2-cp310-cp310-macosx_11_0_arm64.whl", hash = 
"sha256:e0524adb49c716ca763dbc1d27bedce36b14f33e6b8af6dba56886476b42957c"},
-    {file = 
"coverage-6.4.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:d4548be38a1c810d79e097a38107b6bf2ff42151900e47d49635be69943763d8"},
-    {file = 
"coverage-6.4.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:f23876b018dfa5d3e98e96f5644b109090f16a4acb22064e0f06933663005d39"},
-    {file = 
"coverage-6.4.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:6fe75dcfcb889b6800f072f2af5a331342d63d0c1b3d2bf0f7b4f6c353e8c9c0"},
-    {file = "coverage-6.4.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = 
"sha256:2f8553878a24b00d5ab04b7a92a2af50409247ca5c4b7a2bf4eabe94ed20d3ee"},
-    {file = "coverage-6.4.2-cp310-cp310-musllinux_1_1_i686.whl", hash = 
"sha256:d774d9e97007b018a651eadc1b3970ed20237395527e22cbeb743d8e73e0563d"},
-    {file = "coverage-6.4.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = 
"sha256:d56f105592188ce7a797b2bd94b4a8cb2e36d5d9b0d8a1d2060ff2a71e6b9bbc"},
-    {file = "coverage-6.4.2-cp310-cp310-win32.whl", hash = 
"sha256:d230d333b0be8042ac34808ad722eabba30036232e7a6fb3e317c49f61c93386"},
-    {file = "coverage-6.4.2-cp310-cp310-win_amd64.whl", hash = 
"sha256:5ef42e1db047ca42827a85e34abe973971c635f83aed49611b7f3ab49d0130f0"},
-    {file = "coverage-6.4.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = 
"sha256:25b7ec944f114f70803d6529394b64f8749e93cbfac0fe6c5ea1b7e6c14e8a46"},
-    {file = 
"coverage-6.4.2-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:7bb00521ab4f99fdce2d5c05a91bddc0280f0afaee0e0a00425e28e209d4af07"},
-    {file = 
"coverage-6.4.2-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:2dff52b3e7f76ada36f82124703f4953186d9029d00d6287f17c68a75e2e6039"},
-    {file = 
"coverage-6.4.2-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:147605e1702d996279bb3cc3b164f408698850011210d133a2cb96a73a2f7996"},
-    {file = "coverage-6.4.2-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = 
"sha256:422fa44070b42fef9fb8dabd5af03861708cdd6deb69463adc2130b7bf81332f"},
-    {file = "coverage-6.4.2-cp37-cp37m-musllinux_1_1_i686.whl", hash = 
"sha256:8af6c26ba8df6338e57bedbf916d76bdae6308e57fc8f14397f03b5da8622b4e"},
-    {file = "coverage-6.4.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = 
"sha256:5336e0352c0b12c7e72727d50ff02557005f79a0b8dcad9219c7c4940a930083"},
-    {file = "coverage-6.4.2-cp37-cp37m-win32.whl", hash = 
"sha256:0f211df2cba951ffcae210ee00e54921ab42e2b64e0bf2c0befc977377fb09b7"},
-    {file = "coverage-6.4.2-cp37-cp37m-win_amd64.whl", hash = 
"sha256:a13772c19619118903d65a91f1d5fea84be494d12fd406d06c849b00d31bf120"},
-    {file = "coverage-6.4.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = 
"sha256:f7bd0ffbcd03dc39490a1f40b2669cc414fae0c4e16b77bb26806a4d0b7d1452"},
-    {file = "coverage-6.4.2-cp38-cp38-macosx_11_0_arm64.whl", hash = 
"sha256:0895ea6e6f7f9939166cc835df8fa4599e2d9b759b02d1521b574e13b859ac32"},
-    {file = 
"coverage-6.4.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:d4e7ced84a11c10160c0697a6cc0b214a5d7ab21dfec1cd46e89fbf77cc66fae"},
-    {file = 
"coverage-6.4.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:80db4a47a199c4563d4a25919ff29c97c87569130375beca3483b41ad5f698e8"},
-    {file = 
"coverage-6.4.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:3def6791adf580d66f025223078dc84c64696a26f174131059ce8e91452584e1"},
-    {file = "coverage-6.4.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = 
"sha256:4f89d8e03c8a3757aae65570d14033e8edf192ee9298303db15955cadcff0c63"},
-    {file = "coverage-6.4.2-cp38-cp38-musllinux_1_1_i686.whl", hash = 
"sha256:6d0b48aff8e9720bdec315d67723f0babd936a7211dc5df453ddf76f89c59933"},
-    {file = "coverage-6.4.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = 
"sha256:2b20286c2b726f94e766e86a3fddb7b7e37af5d0c635bdfa7e4399bc523563de"},
-    {file = "coverage-6.4.2-cp38-cp38-win32.whl", hash = 
"sha256:d714af0bdba67739598849c9f18efdcc5a0412f4993914a0ec5ce0f1e864d783"},
-    {file = "coverage-6.4.2-cp38-cp38-win_amd64.whl", hash = 
"sha256:5f65e5d3ff2d895dab76b1faca4586b970a99b5d4b24e9aafffc0ce94a6022d6"},
-    {file = "coverage-6.4.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = 
"sha256:a697977157adc052284a7160569b36a8bbec09db3c3220642e6323b47cec090f"},
-    {file = "coverage-6.4.2-cp39-cp39-macosx_11_0_arm64.whl", hash = 
"sha256:c77943ef768276b61c96a3eb854eba55633c7a3fddf0a79f82805f232326d33f"},
-    {file = 
"coverage-6.4.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:54d8d0e073a7f238f0666d3c7c0d37469b2aa43311e4024c925ee14f5d5a1cbe"},
-    {file = 
"coverage-6.4.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:f22325010d8824594820d6ce84fa830838f581a7fd86a9235f0d2ed6deb61e29"},
-    {file = 
"coverage-6.4.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:24b04d305ea172ccb21bee5bacd559383cba2c6fcdef85b7701cf2de4188aa55"},
-    {file = "coverage-6.4.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = 
"sha256:866ebf42b4c5dbafd64455b0a1cd5aa7b4837a894809413b930026c91e18090b"},
-    {file = "coverage-6.4.2-cp39-cp39-musllinux_1_1_i686.whl", hash = 
"sha256:e36750fbbc422c1c46c9d13b937ab437138b998fe74a635ec88989afb57a3978"},
-    {file = "coverage-6.4.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = 
"sha256:79419370d6a637cb18553ecb25228893966bd7935a9120fa454e7076f13b627c"},
-    {file = "coverage-6.4.2-cp39-cp39-win32.whl", hash = 
"sha256:b5e28db9199dd3833cc8a07fa6cf429a01227b5d429facb56eccd765050c26cd"},
-    {file = "coverage-6.4.2-cp39-cp39-win_amd64.whl", hash = 
"sha256:edfdabe7aa4f97ed2b9dd5dde52d2bb29cb466993bb9d612ddd10d0085a683cf"},
-    {file = "coverage-6.4.2-pp36.pp37.pp38-none-any.whl", hash = 
"sha256:e2618cb2cf5a7cc8d698306e42ebcacd02fb7ef8cfc18485c59394152c70be97"},
-    {file = "coverage-6.4.2.tar.gz", hash = 
"sha256:6c3ccfe89c36f3e5b9837b9ee507472310164f352c9fe332120b764c9d60adbe"},
-]
-distlib = [
-    {file = "distlib-0.3.5-py2.py3-none-any.whl", hash = 
"sha256:b710088c59f06338ca514800ad795a132da19fda270e3ce4affc74abf955a26c"},
-    {file = "distlib-0.3.5.tar.gz", hash = 
"sha256:a7f75737c70be3b25e2bee06288cec4e4c221de18455b2dd037fe2a795cab2fe"},
-]
+coverage = []
+distlib = []
 docutils = [
     {file = "docutils-0.19-py3-none-any.whl", hash = 
"sha256:5e1de4d849fee02c63b040a4a3fd567f4ab104defd8a5511fbbc24a8a017efbc"},
     {file = "docutils-0.19.tar.gz", hash = 
"sha256:33995a6753c30b7f577febfc2c50411fec6aac7f7ffeb7c4cfe5991072dcf9e6"},
 ]
-fastavro = [
-    {file = "fastavro-1.5.3-cp310-cp310-macosx_10_14_x86_64.whl", hash = 
"sha256:bcaf1c9ead0aa1c7306cff229f2c207d714f7f765e0bb010e64c74d379008555"},
-    {file = 
"fastavro-1.5.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:8ce66549bdbc9d43f40e1858796c1db59fa9f63f2368217071929c39f7ee7c1e"},
-    {file = 
"fastavro-1.5.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:2c14c72f79bcb689edea06ec276bfd5c4d4f17b8e32f545e32a6ee108c5f09de"},
-    {file = "fastavro-1.5.3-cp310-cp310-win_amd64.whl", hash = 
"sha256:6ad6af14d5e927e0c5fbc5dcb7c7030ef8759c8011620344a57252c263d6dd5a"},
-    {file = "fastavro-1.5.3-cp37-cp37m-macosx_10_15_x86_64.whl", hash = 
"sha256:3a4fa9f72c9b134fcdb0ee30d4a838f1d30656ece72a95f18cc74db10148ee55"},
-    {file = 
"fastavro-1.5.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:f8980f2bc1230aac509273938d23c544283a476bf84ac79b80540d503f941fe5"},
-    {file = 
"fastavro-1.5.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:78ffb34790791a82edef9cce7e49e44faf1b6b0b1da12e9c7d45f31fd6655a7b"},
-    {file = "fastavro-1.5.3-cp37-cp37m-win_amd64.whl", hash = 
"sha256:86a33b2f819cad67f002250e9c6a62909fc31e048679da5f2ac02f4bbf0a5998"},
-    {file = "fastavro-1.5.3-cp38-cp38-macosx_10_15_x86_64.whl", hash = 
"sha256:c5043cfbdfed27f1331860f1d215db838b92d575e8a78915c52c3f6c64d42176"},
-    {file = 
"fastavro-1.5.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:badcbd1c34f446f4b64ade84f9517ef7d52197e0ea9dd6b3775274f2271dba9a"},
-    {file = 
"fastavro-1.5.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash 
= "sha256:259ade020a70870858b656219d25f566eb620e52967070f9e18b040f1c0cd482"},
-    {file = "fastavro-1.5.3-cp38-cp38-win_amd64.whl", hash = 
"sha256:b0408d6b02abcbd4e8866b6b027d7cdd20d570e78177fb7c363fb3cc8ecb02a3"},
-    {file = "fastavro-1.5.3-cp39-cp39-macosx_10_15_x86_64.whl", hash = 
"sha256:063f17e000b7a4174ad362ffc91b75de85970cc67fccfed2bd12e21908de6028"},
-    {file = 
"fastavro-1.5.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:9b399317d4f559d8f3ef695ef8db8ea230e4b489437d2a1e50307120e181e831"},
-    {file = 
"fastavro-1.5.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash 
= "sha256:298e98c55e98f579df66e8258a942494a64b5340212968ed1b6bd9a68c52db5f"},
-    {file = "fastavro-1.5.3-cp39-cp39-win_amd64.whl", hash = 
"sha256:14c23dc870785d860f1523e7be7ce4079dc36070f1bad18139304a5f4d654180"},
-    {file = "fastavro-1.5.3.tar.gz", hash = 
"sha256:262a9f629448a0d4941a4871fd958b70a01f48a0106644b40e4fbaf984f5e89e"},
-]
+fastavro = []
 filelock = [
     {file = "filelock-3.7.1-py3-none-any.whl", hash = 
"sha256:37def7b658813cda163b56fc564cdc75e86d338246458c4c28ae84cabefa2404"},
     {file = "filelock-3.7.1.tar.gz", hash = 
"sha256:3a0fd85166ad9dbab54c9aec96737b744106dc5f15c0b09a6744a445299fcf04"},
 ]
-identify = [
-    {file = "identify-2.5.2-py2.py3-none-any.whl", hash = 
"sha256:feaa9db2dc0ce333b453ce171c0cf1247bbfde2c55fc6bb785022d411a1b78b5"},
-    {file = "identify-2.5.2.tar.gz", hash = 
"sha256:a3d4c096b384d50d5e6dc5bc8b9bc44f1f61cefebd750a7b3e9f939b53fb214d"},
+identify = []
+idna = [
+    {file = "idna-3.3-py3-none-any.whl", hash = 
"sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"},
+    {file = "idna-3.3.tar.gz", hash = 
"sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d"},
 ]
 importlib-metadata = [
     {file = "importlib_metadata-4.12.0-py3-none-any.whl", hash = 
"sha256:7401a975809ea1fdc658c3aa4f78cc2195a0e019c5cbc4c06122884e9ae80c23"},
@@ -617,30 +635,7 @@ nodeenv = [
     {file = "nodeenv-1.7.0-py2.py3-none-any.whl", hash = 
"sha256:27083a7b96a25f2f5e1d8cb4b6317ee8aeda3bdd121394e5ac54e498028a042e"},
     {file = "nodeenv-1.7.0.tar.gz", hash = 
"sha256:e0e7f7dfb85fc5394c6fe1e8fa98131a2473e04311a45afb6508f7cf1836fa2b"},
 ]
-numpy = [
-    {file = "numpy-1.23.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = 
"sha256:b15c3f1ed08df4980e02cc79ee058b788a3d0bef2fb3c9ca90bb8cbd5b8a3a04"},
-    {file = "numpy-1.23.1-cp310-cp310-macosx_11_0_arm64.whl", hash = 
"sha256:9ce242162015b7e88092dccd0e854548c0926b75c7924a3495e02c6067aba1f5"},
-    {file = 
"numpy-1.23.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:e0d7447679ae9a7124385ccf0ea990bb85bb869cef217e2ea6c844b6a6855073"},
-    {file = 
"numpy-1.23.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash 
= "sha256:3119daed207e9410eaf57dcf9591fdc68045f60483d94956bee0bfdcba790953"},
-    {file = "numpy-1.23.1-cp310-cp310-win32.whl", hash = 
"sha256:3ab67966c8d45d55a2bdf40701536af6443763907086c0a6d1232688e27e5447"},
-    {file = "numpy-1.23.1-cp310-cp310-win_amd64.whl", hash = 
"sha256:1865fdf51446839ca3fffaab172461f2b781163f6f395f1aed256b1ddc253622"},
-    {file = "numpy-1.23.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = 
"sha256:aeba539285dcf0a1ba755945865ec61240ede5432df41d6e29fab305f4384db2"},
-    {file = "numpy-1.23.1-cp38-cp38-macosx_11_0_arm64.whl", hash = 
"sha256:7e8229f3687cdadba2c4faef39204feb51ef7c1a9b669247d49a24f3e2e1617c"},
-    {file = 
"numpy-1.23.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash 
= "sha256:68b69f52e6545af010b76516f5daaef6173e73353e3295c5cb9f96c35d755641"},
-    {file = 
"numpy-1.23.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = 
"sha256:1408c3527a74a0209c781ac82bde2182b0f0bf54dea6e6a363fe0cc4488a7ce7"},
-    {file = "numpy-1.23.1-cp38-cp38-win32.whl", hash = 
"sha256:47f10ab202fe4d8495ff484b5561c65dd59177949ca07975663f4494f7269e3e"},
-    {file = "numpy-1.23.1-cp38-cp38-win_amd64.whl", hash = 
"sha256:37e5ebebb0eb54c5b4a9b04e6f3018e16b8ef257d26c8945925ba8105008e645"},
-    {file = "numpy-1.23.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = 
"sha256:173f28921b15d341afadf6c3898a34f20a0569e4ad5435297ba262ee8941e77b"},
-    {file = "numpy-1.23.1-cp39-cp39-macosx_11_0_arm64.whl", hash = 
"sha256:876f60de09734fbcb4e27a97c9a286b51284df1326b1ac5f1bf0ad3678236b22"},
-    {file = 
"numpy-1.23.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash 
= "sha256:35590b9c33c0f1c9732b3231bb6a72d1e4f77872390c47d50a615686ae7ed3fd"},
-    {file = 
"numpy-1.23.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = 
"sha256:a35c4e64dfca659fe4d0f1421fc0f05b8ed1ca8c46fb73d9e5a7f175f85696bb"},
-    {file = "numpy-1.23.1-cp39-cp39-win32.whl", hash = 
"sha256:c2f91f88230042a130ceb1b496932aa717dcbd665350beb821534c5c7e15881c"},
-    {file = "numpy-1.23.1-cp39-cp39-win_amd64.whl", hash = 
"sha256:37ece2bd095e9781a7156852e43d18044fd0d742934833335599c583618181b9"},
-    {file = "numpy-1.23.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = 
"sha256:8002574a6b46ac3b5739a003b5233376aeac5163e5dcd43dd7ad062f3e186129"},
-    {file = 
"numpy-1.23.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:5d732d17b8a9061540a10fda5bfeabca5785700ab5469a5e9b93aca5e2d3a5fb"},
-    {file = "numpy-1.23.1-pp38-pypy38_pp73-win_amd64.whl", hash = 
"sha256:55df0f7483b822855af67e38fb3a526e787adf189383b4934305565d71c4b148"},
-    {file = "numpy-1.23.1.tar.gz", hash = 
"sha256:d748ef349bfef2e1194b59da37ed5a29c19ea8d7e6342019921ba2ba4fd8b624"},
-]
+numpy = []
 packaging = [
     {file = "packaging-21.3-py3-none-any.whl", hash = 
"sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"},
     {file = "packaging-21.3.tar.gz", hash = 
"sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"},
@@ -657,10 +652,7 @@ pluggy = [
     {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = 
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"},
     {file = "pluggy-1.0.0.tar.gz", hash = 
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"},
 ]
-pre-commit = [
-    {file = "pre_commit-2.20.0-py2.py3-none-any.whl", hash = 
"sha256:51a5ba7c480ae8072ecdb6933df22d2f812dc897d5fe848778116129a681aac7"},
-    {file = "pre_commit-2.20.0.tar.gz", hash = 
"sha256:a978dac7bc9ec0bcee55c18a277d553b0f419d259dadb4b9418ff2d00eb43959"},
-]
+pre-commit = []
 py = [
     {file = "py-1.11.0-py2.py3-none-any.whl", hash = 
"sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"},
     {file = "py-1.11.0.tar.gz", hash = 
"sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
@@ -835,6 +827,11 @@ pyyaml = [
     {file = "PyYAML-6.0-cp39-cp39-win_amd64.whl", hash = 
"sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c"},
     {file = "PyYAML-6.0.tar.gz", hash = 
"sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2"},
 ]
+requests = [
+    {file = "requests-2.28.1-py3-none-any.whl", hash = 
"sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349"},
+    {file = "requests-2.28.1.tar.gz", hash = 
"sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983"},
+]
+requests-mock = []
 six = [
     {file = "six-1.16.0-py2.py3-none-any.whl", hash = 
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
     {file = "six-1.16.0.tar.gz", hash = 
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
@@ -851,57 +848,10 @@ typing-extensions = [
     {file = "typing_extensions-4.3.0-py3-none-any.whl", hash = 
"sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"},
     {file = "typing_extensions-4.3.0.tar.gz", hash = 
"sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"},
 ]
+urllib3 = []
 virtualenv = [
     {file = "virtualenv-20.15.1-py2.py3-none-any.whl", hash = 
"sha256:b30aefac647e86af6d82bfc944c556f8f1a9c90427b2fb4e3bfbf338cb82becf"},
     {file = "virtualenv-20.15.1.tar.gz", hash = 
"sha256:288171134a2ff3bfb1a2f54f119e77cd1b81c29fc1265a2356f3e8d14c7d58c4"},
 ]
-zipp = [
-    {file = "zipp-3.8.1-py3-none-any.whl", hash = 
"sha256:47c40d7fe183a6f21403a199b3e4192cca5774656965b0a4988ad2f8feb5f009"},
-    {file = "zipp-3.8.1.tar.gz", hash = 
"sha256:05b45f1ee8f807d0cc928485ca40a07cb491cf092ff587c0df9cb1fd154848d2"},
-]
-zstandard = [
-    {file = "zstandard-0.18.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = 
"sha256:ef7e8a200e4c8ac9102ed3c90ed2aa379f6b880f63032200909c1be21951f556"},
-    {file = "zstandard-0.18.0-cp310-cp310-macosx_11_0_arm64.whl", hash = 
"sha256:2dc466207016564805e56d28375f4f533b525ff50d6776946980dff5465566ac"},
-    {file = 
"zstandard-0.18.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:4a2ee1d4f98447f3e5183ecfce5626f983504a4a0c005fbe92e60fa8e5d547ec"},
-    {file = 
"zstandard-0.18.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:d956e2f03c7200d7e61345e0880c292783ec26618d0d921dcad470cb195bbce2"},
-    {file = 
"zstandard-0.18.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl",
 hash = 
"sha256:ce6f59cba9854fd14da5bfe34217a1501143057313966637b7291d1b0267bd1e"},
-    {file = 
"zstandard-0.18.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:a7fa67cba473623848b6e88acf8d799b1906178fd883fb3a1da24561c779593b"},
-    {file = 
"zstandard-0.18.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl",
 hash = 
"sha256:cdb44d7284c8c5dd1b66dfb86dda7f4560fa94bfbbc1d2da749ba44831335e32"},
-    {file = "zstandard-0.18.0-cp310-cp310-win32.whl", hash = 
"sha256:63694a376cde0aa8b1971d06ca28e8f8b5f492779cb6ee1cc46bbc3f019a42a5"},
-    {file = "zstandard-0.18.0-cp310-cp310-win_amd64.whl", hash = 
"sha256:702a8324cd90c74d9c8780d02bf55e79da3193c870c9665ad3a11647e3ad1435"},
-    {file = "zstandard-0.18.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = 
"sha256:46f679bc5dfd938db4fb058218d9dc4db1336ffaf1ea774ff152ecadabd40805"},
-    {file = 
"zstandard-0.18.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:dc2a4de9f363b3247d472362a65041fe4c0f59e01a2846b15d13046be866a885"},
-    {file = 
"zstandard-0.18.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:bd3220d7627fd4d26397211cb3b560ec7cc4a94b75cfce89e847e8ce7fabe32d"},
-    {file = 
"zstandard-0.18.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl",
 hash = 
"sha256:39e98cf4773234bd9cebf9f9db730e451dfcfe435e220f8921242afda8321887"},
-    {file = 
"zstandard-0.18.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:5228e596eb1554598c872a337bbe4e5afe41cd1f8b1b15f2e35b50d061e35244"},
-    {file = 
"zstandard-0.18.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl",
 hash = 
"sha256:d4a8fd45746a6c31e729f35196e80b8f1e9987c59f5ccb8859d7c6a6fbeb9c63"},
-    {file = "zstandard-0.18.0-cp36-cp36m-win32.whl", hash = 
"sha256:4cbb85f29a990c2fdbf7bc63246567061a362ddca886d7fae6f780267c0a9e67"},
-    {file = "zstandard-0.18.0-cp36-cp36m-win_amd64.whl", hash = 
"sha256:bfa6c8549fa18e6497a738b7033c49f94a8e2e30c5fbe2d14d0b5aa8bbc1695d"},
-    {file = "zstandard-0.18.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = 
"sha256:e02043297c1832f2666cd2204f381bef43b10d56929e13c42c10c732c6e3b4ed"},
-    {file = 
"zstandard-0.18.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:7231543d38d2b7e02ef7cc78ef7ffd86419437e1114ff08709fe25a160e24bd6"},
-    {file = 
"zstandard-0.18.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:c86befac87445927488f5c8f205d11566f64c11519db223e9d282b945fa60dab"},
-    {file = 
"zstandard-0.18.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl",
 hash = 
"sha256:999a4e1768f219826ba3fa2064fab1c86dd72fdd47a42536235478c3bb3ca3e2"},
-    {file = 
"zstandard-0.18.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:9df59cd1cf3c62075ee2a4da767089d19d874ac3ad42b04a71a167e91b384722"},
-    {file = 
"zstandard-0.18.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl",
 hash = 
"sha256:1be31e9e3f7607ee0cdd60915410a5968b205d3e7aa83b7fcf3dd76dbbdb39e0"},
-    {file = "zstandard-0.18.0-cp37-cp37m-win32.whl", hash = 
"sha256:490d11b705b8ae9dc845431bacc8dd1cef2408aede176620a5cd0cd411027936"},
-    {file = "zstandard-0.18.0-cp37-cp37m-win_amd64.whl", hash = 
"sha256:266aba27fa9cc5e9091d3d325ebab1fa260f64e83e42516d5e73947c70216a5b"},
-    {file = "zstandard-0.18.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = 
"sha256:8b2260c4e07dd0723eadb586de7718b61acca4083a490dda69c5719d79bc715c"},
-    {file = "zstandard-0.18.0-cp38-cp38-macosx_11_0_arm64.whl", hash = 
"sha256:3af8c2383d02feb6650e9255491ec7d0824f6e6dd2bbe3e521c469c985f31fb1"},
-    {file = 
"zstandard-0.18.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:28723a1d2e4df778573b76b321ebe9f3469ac98988104c2af116dd344802c3f8"},
-    {file = 
"zstandard-0.18.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:19cac7108ff2c342317fad6dc97604b47a41f403c8f19d0bfc396dfadc3638b8"},
-    {file = 
"zstandard-0.18.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl",
 hash = 
"sha256:76725d1ee83a8915100a310bbad5d9c1fc6397410259c94033b8318d548d9990"},
-    {file = 
"zstandard-0.18.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:d716a7694ce1fa60b20bc10f35c4a22be446ef7f514c8dbc8f858b61976de2fb"},
-    {file = 
"zstandard-0.18.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl",
 hash = 
"sha256:49685bf9a55d1ab34bd8423ea22db836ba43a181ac6b045ac4272093d5cb874e"},
-    {file = "zstandard-0.18.0-cp38-cp38-win32.whl", hash = 
"sha256:1af1268a7dc870eb27515fb8db1f3e6c5a555d2b7bcc476fc3bab8886c7265ab"},
-    {file = "zstandard-0.18.0-cp38-cp38-win_amd64.whl", hash = 
"sha256:1dc2d3809e763055a1a6c1a73f2b677320cc9a5aa1a7c6cfb35aee59bddc42d9"},
-    {file = "zstandard-0.18.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = 
"sha256:eea18c1e7442f2aa9aff1bb84550dbb6a1f711faf6e48e7319de8f2b2e923c2a"},
-    {file = "zstandard-0.18.0-cp39-cp39-macosx_11_0_arm64.whl", hash = 
"sha256:8677ffc6a6096cccbd892e558471c901fd821aba12b7fbc63833c7346f549224"},
-    {file = 
"zstandard-0.18.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", 
hash = 
"sha256:083dc08abf03807af9beeb2b6a91c23ad78add2499f828176a3c7b742c44df02"},
-    {file = 
"zstandard-0.18.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:c990063664c08169c84474acecc9251ee035871589025cac47c060ff4ec4bc1a"},
-    {file = 
"zstandard-0.18.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl",
 hash = 
"sha256:533db8a6fac6248b2cb2c935e7b92f994efbdeb72e1ffa0b354432e087bb5a3e"},
-    {file = 
"zstandard-0.18.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
 hash = 
"sha256:dbb3cb8a082d62b8a73af42291569d266b05605e017a3d8a06a0e5c30b5f10f0"},
-    {file = 
"zstandard-0.18.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl",
 hash = 
"sha256:d6c85ca5162049ede475b7ec98e87f9390501d44a3d6776ddd504e872464ec25"},
-    {file = "zstandard-0.18.0-cp39-cp39-win32.whl", hash = 
"sha256:75479e7c2b3eebf402c59fbe57d21bc400cefa145ca356ee053b0a08908c5784"},
-    {file = "zstandard-0.18.0-cp39-cp39-win_amd64.whl", hash = 
"sha256:d85bfabad444812133a92fc6fbe463e1d07581dba72f041f07a360e63808b23c"},
-    {file = "zstandard-0.18.0.tar.gz", hash = 
"sha256:0ac0357a0d985b4ff31a854744040d7b5754385d1f98f7145c30e02c6865cb6f"},
-]
+zipp = []
+zstandard = []
diff --git a/python/pyiceberg/catalog/base.py b/python/pyiceberg/catalog/base.py
index 6a2dcf9e0..d70fd7b57 100644
--- a/python/pyiceberg/catalog/base.py
+++ b/python/pyiceberg/catalog/base.py
@@ -18,11 +18,20 @@
 from __future__ import annotations
 
 from abc import ABC, abstractmethod
+from dataclasses import dataclass
 
 from pyiceberg.catalog import Identifier, Properties
 from pyiceberg.schema import Schema
 from pyiceberg.table.base import Table
-from pyiceberg.table.partitioning import PartitionSpec
+from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionSpec
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+
+
+@dataclass
+class PropertiesUpdateSummary:
+    removed: list[str]
+    updated: list[str]
+    missing: list[str]
 
 
 class Catalog(ABC):
@@ -57,7 +66,8 @@ class Catalog(ABC):
         identifier: str | Identifier,
         schema: Schema,
         location: str | None = None,
-        partition_spec: PartitionSpec | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
         properties: Properties | None = None,
     ) -> Table:
         """Create a table
@@ -66,7 +76,8 @@ class Catalog(ABC):
             identifier: Table identifier.
             schema: Table's schema.
             location: Location for the table. Optional Argument.
-            partition_spec: PartitionSpec for the table. Optional Argument.
+            partition_spec: PartitionSpec for the table.
+            sort_order: SortOrder for the table.
             properties: Table properties that can be a string based 
dictionary. Optional Argument.
 
         Returns:
@@ -195,7 +206,7 @@ class Catalog(ABC):
     @abstractmethod
     def update_namespace_properties(
         self, namespace: str | Identifier, removals: set[str] | None = None, 
updates: Properties | None = None
-    ) -> None:
+    ) -> PropertiesUpdateSummary:
         """Removes provided property keys and updates properties for a 
namespace.
 
         Args:
diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py
new file mode 100644
index 000000000..fc228295c
--- /dev/null
+++ b/python/pyiceberg/catalog/rest.py
@@ -0,0 +1,434 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from json import JSONDecodeError
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+    Union,
+)
+
+import requests
+from pydantic import Field
+from requests import HTTPError
+
+from pyiceberg import __version__
+from pyiceberg.catalog import Identifier, Properties
+from pyiceberg.catalog.base import Catalog, PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    AlreadyExistsError,
+    AuthorizationExpiredError,
+    BadCredentialsError,
+    BadRequestError,
+    ForbiddenError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    RESTError,
+    ServerError,
+    ServiceUnavailableError,
+    TableAlreadyExistsError,
+    UnauthorizedError,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.base import Table
+from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
+from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionSpec
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+
+
+class Endpoints:
+    get_config: str = "config"
+    list_namespaces: str = "namespaces"
+    create_namespace: str = "namespaces"
+    load_namespace_metadata: str = "namespaces/{namespace}"
+    drop_namespace: str = "namespaces/{namespace}"
+    update_properties: str = "namespaces/{namespace}/properties"
+    list_tables: str = "namespaces/{namespace}/tables"
+    create_table: str = "namespaces/{namespace}/tables"
+    load_table: str = "namespaces/{namespace}/tables/{table}"
+    update_table: str = "namespaces/{namespace}/tables/{table}"
+    drop_table: str = 
"namespaces/{namespace}/tables/{table}?purgeRequested={purge}"
+    table_exists: str = "namespaces/{namespace}/tables/{table}"
+    get_token: str = "oauth/tokens"
+    rename_table: str = "tables/rename"
+
+
+AUTHORIZATION_HEADER = "Authorization"
+BEARER_PREFIX = "Bearer"
+CATALOG_SCOPE = "catalog"
+CLIENT_ID = "client_id"
+PREFIX = "prefix"
+CLIENT_SECRET = "client_secret"
+CLIENT_CREDENTIALS = "client_credentials"
+CREDENTIAL = "credential"
+GRANT_TYPE = "grant_type"
+SCOPE = "scope"
+TOKEN_EXCHANGE = "urn:ietf:params:oauth:grant-type:token-exchange"
+
+NAMESPACE_SEPARATOR = b"\x1F".decode("UTF-8")
+
+
+class TableResponse(IcebergBaseModel):
+    metadata_location: Optional[str] = Field(alias="metadata-location", 
default=None)
+    metadata: Union[TableMetadataV1, TableMetadataV2] = Field()
+    config: Properties = Field(default_factory=dict)
+
+
+class CreateTableRequest(IcebergBaseModel):
+    name: str = Field()
+    location: Optional[str] = Field()
+    table_schema: Schema = Field(alias="schema")
+    partition_spec: Optional[PartitionSpec] = Field(alias="partition-spec")
+    write_order: Optional[SortOrder] = Field(alias="write-order")
+    stage_create: bool = Field(alias="stage-create", default=False)
+    properties: Properties = Field(default_factory=dict)
+
+
+class TokenResponse(IcebergBaseModel):
+    access_token: str = Field()
+    token_type: str = Field()
+    expires_in: int = Field()
+    issued_token_type: str = Field()
+
+
+class ConfigResponse(IcebergBaseModel):
+    defaults: Properties = Field()
+    overrides: Properties = Field()
+
+
+class ListNamespaceResponse(IcebergBaseModel):
+    namespaces: List[Identifier] = Field()
+
+
+class NamespaceResponse(IcebergBaseModel):
+    namespace: Identifier = Field()
+    properties: Properties = Field()
+
+
+class UpdateNamespacePropertiesResponse(IcebergBaseModel):
+    removed: List[str] = Field()
+    updated: List[str] = Field()
+    missing: List[str] = Field()
+
+
+class ListTableResponseEntry(IcebergBaseModel):
+    name: str = Field()
+    namespace: Identifier = Field()
+
+
+class ListTablesResponse(IcebergBaseModel):
+    identifiers: List[ListTableResponseEntry] = Field()
+
+
+class ErrorResponseMessage(IcebergBaseModel):
+    message: str = Field()
+    type: str = Field()
+    code: int = Field()
+
+
+class ErrorResponse(IcebergBaseModel):
+    error: ErrorResponseMessage = Field()
+
+
+class RestCatalog(Catalog):
+    token: str
+    config: Properties
+
+    host: str
+
+    def __init__(
+        self,
+        name: str,
+        properties: Properties,
+        host: str,
+        client_id: Optional[str] = None,
+        client_secret: Optional[str] = None,
+        token: Optional[str] = None,
+    ):
+        """Rest Catalog
+
+        You either need to provide a client_id and client_secret, or an 
already valid token.
+
+        Args:
+            name: Name to identify the catalog
+            properties: Properties that are passed along to the configuration
+            host: The base-url of the REST Catalog endpoint
+            client_id: The id to identify the client
+            client_secret: The secret for the client
+            token: The bearer token
+        """
+        self.host = host
+        if client_id and client_secret:
+            self.token = self._fetch_access_token(client_id, client_secret)
+        elif token:
+            self.token = token
+        else:
+            raise ValueError("Either set the client_id and client_secret, or 
provide a valid token")
+        self.config = self._fetch_config(properties)
+        super().__init__(name, properties)
+
+    @staticmethod
+    def _split_credential(token: str) -> Tuple[str, str]:
+        """Splits the token in a client id and secret
+
+        Args:
+            token: The token with a semicolon as a separator
+
+        Returns:
+            The client id and secret
+        """
+        client, secret = token.split(":")
+        return client, secret
+
+    @property
+    def headers(self) -> Properties:
+        return {
+            AUTHORIZATION_HEADER: f"{BEARER_PREFIX} {self.token}",
+            "Content-type": "application/json",
+            "X-Client-Version": __version__,
+        }
+
+    def url(self, endpoint: str, prefixed: bool = True, **kwargs) -> str:
+        """Constructs the endpoint
+
+        Args:
+            prefixed: If the prefix return by the config needs to be appended
+
+        Returns:
+            The base url of the rest catalog
+        """
+
+        url = self.host
+        url = url + "v1/" if url.endswith("/") else url + "/v1/"
+
+        if prefixed:
+            url += self.config.get(PREFIX, "")
+            url = url if url.endswith("/") else url + "/"
+
+        return url + endpoint.format(**kwargs)
+
+    def _fetch_access_token(self, client_id: str, client_secret: str) -> str:
+        data = {GRANT_TYPE: CLIENT_CREDENTIALS, CLIENT_ID: client_id, 
CLIENT_SECRET: client_secret, SCOPE: CATALOG_SCOPE}
+        url = self.url(Endpoints.get_token, prefixed=False)
+        # Uses application/x-www-form-urlencoded by default
+        response = requests.post(url=url, data=data)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {401: BadCredentialsError})
+
+        return TokenResponse(**response.json()).access_token
+
+    def _fetch_config(self, properties: Properties) -> Properties:
+        response = requests.get(self.url(Endpoints.get_config, 
prefixed=False), headers=self.headers)
+        response.raise_for_status()
+        config_response = ConfigResponse(**response.json())
+        config = config_response.defaults
+        config.update(properties)
+        config.update(config_response.overrides)
+        return config
+
+    def _split_identifier_for_path(self, identifier: Union[str, Identifier]) 
-> Properties:
+        identifier = self.identifier_to_tuple(identifier)
+        return {"namespace": NAMESPACE_SEPARATOR.join(identifier[:-1]), 
"table": identifier[-1]}
+
+    def _split_identifier_for_json(self, identifier: Union[str, Identifier]) 
-> Dict[str, Union[Identifier, str]]:
+        identifier = self.identifier_to_tuple(identifier)
+        return {"namespace": identifier[:-1], "name": identifier[-1]}
+
+    def _handle_non_200_response(self, exc: HTTPError, error_handler: 
Dict[int, Type[Exception]]):
+        try:
+            response = ErrorResponse(**exc.response.json())
+        except JSONDecodeError:
+            # In the case we don't have a proper response
+            response = ErrorResponse(
+                error=ErrorResponseMessage(
+                    message=f"Could not decode json payload: 
{exc.response.text}",
+                    type="RESTError",
+                    code=exc.response.status_code,
+                )
+            )
+
+        code = exc.response.status_code
+        if code in error_handler:
+            raise error_handler[code](response.error.message) from exc
+        elif code == 400:
+            raise BadRequestError(response.error.message) from exc
+        elif code == 401:
+            raise UnauthorizedError(response.error.message) from exc
+        elif code == 403:
+            raise ForbiddenError(response.error.message) from exc
+        elif code == 422:
+            raise RESTError(response.error.message) from exc
+        elif code == 419:
+            raise AuthorizationExpiredError(response.error.message)
+        elif code == 501:
+            raise NotImplementedError(response.error.message)
+        elif code == 503:
+            raise ServiceUnavailableError(response.error.message) from exc
+        elif 500 <= code < 600:
+            raise ServerError(response.error.message) from exc
+        else:
+            raise RESTError(response.error.message) from exc
+
+    def create_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Schema,
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Optional[Properties] = None,
+    ) -> Table:
+        namespace_and_table = self._split_identifier_for_path(identifier)
+        properties = properties or {}
+        request = CreateTableRequest(
+            name=namespace_and_table["table"],
+            location=location,
+            table_schema=schema,
+            partition_spec=partition_spec,
+            write_order=sort_order,
+            properties=properties,
+        )
+        serialized_json = request.json()
+        response = requests.post(
+            self.url(Endpoints.create_table, 
namespace=namespace_and_table["namespace"]),
+            data=serialized_json,
+            headers=self.headers,
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
+
+        table_response = TableResponse(**response.json())
+
+        return Table(
+            identifier=(self.name,) + self.identifier_to_tuple(identifier),
+            metadata_location=table_response.metadata_location,
+            metadata=table_response.metadata,
+        )
+
+    def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) 
-> List[Identifier]:
+        namespace_concat = 
NAMESPACE_SEPARATOR.join(self.identifier_to_tuple(namespace or ""))
+        response = requests.get(
+            self.url(Endpoints.list_tables, namespace=namespace_concat),
+            headers=self.headers,
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchNamespaceError})
+        return [(*table.namespace, table.name) for table in 
ListTablesResponse(**response.json()).identifiers]
+
+    def load_table(self, identifier: Union[str, Identifier]) -> Table:
+        response = requests.get(
+            self.url(Endpoints.load_table, prefixed=True, 
**self._split_identifier_for_path(identifier)), headers=self.headers
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchTableError})
+
+        table_response = TableResponse(**response.json())
+
+        return Table(
+            identifier=(self.name,) + self.identifier_to_tuple(identifier),
+            metadata_location=table_response.metadata_location,
+            metadata=table_response.metadata,
+        )
+
+    def drop_table(self, identifier: Union[str, Identifier], purge_requested: 
bool = False) -> None:
+        response = requests.delete(
+            self.url(Endpoints.drop_table, prefixed=True, 
purge=purge_requested, **self._split_identifier_for_path(identifier)),
+            headers=self.headers,
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchTableError})
+
+    def purge_table(self, identifier: Union[str, Identifier]) -> None:
+        self.drop_table(identifier=identifier, purge_requested=True)
+
+    def rename_table(self, from_identifier: Union[str, Identifier], 
to_identifier: Union[str, Identifier]):
+        payload = {
+            "source": self._split_identifier_for_json(from_identifier),
+            "destination": self._split_identifier_for_json(to_identifier),
+        }
+        response = requests.post(self.url(Endpoints.rename_table), 
json=payload, headers=self.headers)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchTableError, 409: 
TableAlreadyExistsError})
+
+    def create_namespace(self, namespace: Union[str, Identifier], properties: 
Optional[Properties] = None) -> None:
+        payload = {"namespace": self.identifier_to_tuple(namespace), 
"properties": properties or {}}
+        response = requests.post(self.url(Endpoints.create_namespace), 
json=payload, headers=self.headers)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 
409: AlreadyExistsError})
+
+    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
+        namespace = 
NAMESPACE_SEPARATOR.join(self.identifier_to_tuple(namespace))
+        response = requests.delete(self.url(Endpoints.drop_namespace, 
namespace=namespace), headers=self.headers)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchNamespaceError})
+
+    def list_namespaces(self) -> List[Identifier]:
+        response = requests.get(self.url(Endpoints.list_namespaces), 
headers=self.headers)
+        response.raise_for_status()
+        namespaces = ListNamespaceResponse(**response.json())
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {})
+        return namespaces.namespaces
+
+    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> 
Properties:
+        namespace = 
NAMESPACE_SEPARATOR.join(self.identifier_to_tuple(namespace))
+        response = requests.get(self.url(Endpoints.load_namespace_metadata, 
namespace=namespace), headers=self.headers)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchNamespaceError})
+
+        return NamespaceResponse(**response.json()).properties
+
+    def update_namespace_properties(
+        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] 
= None, updates: Optional[Properties] = None
+    ) -> PropertiesUpdateSummary:
+        namespace = 
NAMESPACE_SEPARATOR.join(self.identifier_to_tuple(namespace))
+        payload = {"removals": list(removals or []), "updates": updates}
+        response = requests.post(self.url(Endpoints.update_properties, 
namespace=namespace), json=payload, headers=self.headers)
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(exc, {404: NoSuchNamespaceError})
+        parsed_response = UpdateNamespacePropertiesResponse(**response.json())
+        return PropertiesUpdateSummary(
+            removed=parsed_response.removed,
+            updated=parsed_response.updated,
+            missing=parsed_response.missing,
+        )
diff --git a/python/pyiceberg/exceptions.py b/python/pyiceberg/exceptions.py
index f9ac3333b..f38cd0df6 100644
--- a/python/pyiceberg/exceptions.py
+++ b/python/pyiceberg/exceptions.py
@@ -16,12 +16,8 @@
 #  under the License.
 
 
-class NoSuchTableError(Exception):
-    """Raised when a referenced table is not found"""
-
-
-class NoSuchNamespaceError(Exception):
-    """Raised when a referenced name-space is not found"""
+class TableAlreadyExistsError(Exception):
+    """Raised when creating a table with a name that already exists"""
 
 
 class NamespaceNotEmptyError(Exception):
@@ -34,3 +30,43 @@ class AlreadyExistsError(Exception):
 
 class ValidationError(Exception):
     """Raises when there is an issue with the schema"""
+
+
+class RESTError(Exception):
+    """Raises when there is an unknown response from the REST Catalog"""
+
+
+class BadCredentialsError(RESTError):
+    """Raises when providing invalid credentials"""
+
+
+class BadRequestError(RESTError):
+    """Raises when an invalid request is being made"""
+
+
+class UnauthorizedError(RESTError):
+    """Raises when you don't have the proper authorization"""
+
+
+class ServiceUnavailableError(RESTError):
+    """Raises when the service doesn't respond"""
+
+
+class ServerError(RESTError):
+    """Raises when there is an unhandled exception on the server side"""
+
+
+class ForbiddenError(RESTError):
+    """Raises when you don't have the credentials to perform the action on the 
REST catalog"""
+
+
+class AuthorizationExpiredError(RESTError):
+    """When the credentials are expired when performing an action on the REST 
catalog"""
+
+
+class NoSuchTableError(RESTError):
+    """Raises when the table can't be found in the REST catalog"""
+
+
+class NoSuchNamespaceError(RESTError):
+    """Raised when a referenced name-space is not found"""
diff --git a/python/pyiceberg/schema.py b/python/pyiceberg/schema.py
index ea4f4cf6c..5e5d4af56 100644
--- a/python/pyiceberg/schema.py
+++ b/python/pyiceberg/schema.py
@@ -24,6 +24,7 @@ from typing import (
     Dict,
     Generic,
     List,
+    Literal,
     Optional,
     Tuple,
     TypeVar,
@@ -54,6 +55,7 @@ class Schema(IcebergBaseModel):
         >>> from pyiceberg import types
     """
 
+    type: Literal["struct"] = "struct"
     fields: Tuple[NestedField, ...] = Field(default_factory=tuple)
     schema_id: int = Field(alias="schema-id")
     identifier_field_ids: List[int] = Field(alias="identifier-field-ids", 
default_factory=list)
diff --git a/python/pyiceberg/table/base.py b/python/pyiceberg/table/base.py
index 77821547e..5e7c06a5b 100644
--- a/python/pyiceberg/table/base.py
+++ b/python/pyiceberg/table/base.py
@@ -15,17 +15,16 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from __future__ import annotations
+from typing import Optional, Union
 
-from abc import ABC
+from pydantic import Field
 
 from pyiceberg.catalog.base import Identifier
+from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
+from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 
-class Table(ABC):
-    """Placeholder for Table managed by the Catalog that points to the current 
Table Metadata.
-
-    To be implemented by https://github.com/apache/iceberg/issues/3227
-    """
-
-    identifier: str | Identifier
+class Table(IcebergBaseModel):
+    identifier: Identifier = Field()
+    metadata_location: Optional[str] = Field()
+    metadata: Union[TableMetadataV1, TableMetadataV2] = Field()
diff --git a/python/pyiceberg/table/metadata.py 
b/python/pyiceberg/table/metadata.py
index 6543d4ce3..9608bf138 100644
--- a/python/pyiceberg/table/metadata.py
+++ b/python/pyiceberg/table/metadata.py
@@ -38,6 +38,7 @@ from pyiceberg.utils.iceberg_base_model import 
IcebergBaseModel
 INITIAL_SEQUENCE_NUMBER = 0
 INITIAL_SPEC_ID = 0
 DEFAULT_SCHEMA_ID = 0
+DEFAULT_LAST_PARTITION_ID = 1000
 
 
 def check_schemas(values: Dict[str, Any]) -> Dict[str, Any]:
@@ -103,7 +104,7 @@ class TableMetadataCommonFields(IcebergBaseModel):
     """The table’s base location. This is used by writers to determine where
     to store data files, manifest files, and table metadata files."""
 
-    table_uuid: Optional[UUID] = Field(alias="table-uuid")
+    table_uuid: Optional[UUID] = Field(alias="table-uuid", 
default_factory=uuid4)
     """A UUID that identifies the table, generated when the table is created.
     Implementations must throw an exception if a table’s UUID does not match
     the expected UUID after refreshing metadata."""
@@ -210,12 +211,10 @@ class TableMetadataV1(TableMetadataCommonFields, 
IcebergBaseModel):
         Returns:
             The TableMetadata with the defaults applied
         """
-        if "schema-id" not in data["schema"]:
+        if data.get("schema") and "schema-id" not in data["schema"]:
             data["schema"]["schema-id"] = DEFAULT_SCHEMA_ID
-        if "last-partition-id" not in data:
+        if data.get("partition-spec") and "last-partition-id" not in data:
             data["last-partition-id"] = max(spec["field-id"] for spec in 
data["partition-spec"])
-        if "table-uuid" not in data:
-            data["table-uuid"] = uuid4()
         return data
 
     @root_validator(skip_on_failure=True)
@@ -236,7 +235,7 @@ class TableMetadataV1(TableMetadataCommonFields, 
IcebergBaseModel):
             schema = data["schema_"]
             data["schemas"] = [schema]
         else:
-            check_schemas(data["schemas"])
+            check_schemas(data)
         return data
 
     @root_validator(skip_on_failure=True)
@@ -257,7 +256,7 @@ class TableMetadataV1(TableMetadataCommonFields, 
IcebergBaseModel):
             fields = data["partition_spec"]
             data["partition_specs"] = [PartitionSpec(spec_id=INITIAL_SPEC_ID, 
fields=fields)]
         else:
-            check_partition_specs(data["partition_specs"])
+            check_partition_specs(data)
         return data
 
     @root_validator(skip_on_failure=True)
@@ -273,10 +272,10 @@ class TableMetadataV1(TableMetadataCommonFields, 
IcebergBaseModel):
         Returns:
             The TableMetadata with the sort_orders set, if not provided
         """
-        if sort_orders := data.get("sort_orders"):
-            check_sort_orders(sort_orders)
-        else:
+        if not data.get("sort_orders"):
             data["sort_orders"] = [UNSORTED_SORT_ORDER]
+        else:
+            check_sort_orders(data)
         return data
 
     def to_v2(self) -> "TableMetadataV2":
diff --git a/python/pyiceberg/table/partitioning.py 
b/python/pyiceberg/table/partitioning.py
index ef080bc30..f5222ad59 100644
--- a/python/pyiceberg/table/partitioning.py
+++ b/python/pyiceberg/table/partitioning.py
@@ -81,7 +81,7 @@ class PartitionSpec(IcebergBaseModel):
     """
 
     spec_id: int = Field(alias="spec-id")
-    fields: Tuple[PartitionField, ...] = Field()
+    fields: Tuple[PartitionField, ...] = Field(default_factory=tuple)
 
     def __init__(
         self,
@@ -154,3 +154,6 @@ class PartitionSpec(IcebergBaseModel):
             and this_field.name == that_field.name
             for this_field, that_field in zip(self.fields, other.fields)
         )
+
+
+UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
diff --git a/python/pyiceberg/utils/iceberg_base_model.py 
b/python/pyiceberg/utils/iceberg_base_model.py
index e218f9f89..e3ad2eab0 100644
--- a/python/pyiceberg/utils/iceberg_base_model.py
+++ b/python/pyiceberg/utils/iceberg_base_model.py
@@ -37,6 +37,7 @@ class IcebergBaseModel(BaseModel):
     class Config:
         keep_untouched = (cached_property,)
         allow_population_by_field_name = True
+        copy_on_model_validation = False
         frozen = True
 
     def dict(self, exclude_none: bool = True, **kwargs):
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 24cae0c81..55011bcaa 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -40,6 +40,8 @@ packages = [
 [tool.poetry.dependencies]
 python = "^3.8"
 mmh3 = "^3.0.0"
+requests = "^2.28.1"
+
 pydantic = "^1.9.1"
 
 pyarrow = { version = "^8.0.0", optional = true }
@@ -54,6 +56,7 @@ pytest-checkdocs = "^2.0.0"
 pre-commit = "^2.0.0"
 fastavro = "^1.5.1"
 coverage = { version = "^6.0.0", extras = ["toml"] }
+requests-mock = "^1.9.3"
 
 [build-system]
 requires = ["poetry-core>=1.0.0"]
@@ -111,5 +114,9 @@ ignore_missing_imports = true
 module = "mmh3.*"
 ignore_missing_imports = true
 
+[[tool.mypy.overrides]]
+module = "requests_mock.*"
+ignore_missing_imports = true
+
 [tool.coverage.run]
 source = ['pyiceberg/']
diff --git a/python/tests/catalog/test_base.py 
b/python/tests/catalog/test_base.py
index e4dd4d399..49790ecbf 100644
--- a/python/tests/catalog/test_base.py
+++ b/python/tests/catalog/test_base.py
@@ -26,7 +26,7 @@ from typing import (
 import pytest
 
 from pyiceberg.catalog import Identifier, Properties
-from pyiceberg.catalog.base import Catalog
+from pyiceberg.catalog.base import Catalog, PropertiesUpdateSummary
 from pyiceberg.exceptions import (
     AlreadyExistsError,
     NamespaceNotEmptyError,
@@ -36,7 +36,9 @@ from pyiceberg.exceptions import (
 from pyiceberg.schema import Schema
 from pyiceberg.table.base import Table
 from pyiceberg.table.metadata import INITIAL_SPEC_ID
-from pyiceberg.table.partitioning import PartitionSpec
+from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionSpec
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from tests.table.test_metadata import EXAMPLE_TABLE_METADATA_V1
 
 
 class InMemoryCatalog(Catalog):
@@ -55,7 +57,8 @@ class InMemoryCatalog(Catalog):
         identifier: Union[str, Identifier],
         schema: Schema,
         location: Optional[str] = None,
-        partition_spec: Optional[PartitionSpec] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
         properties: Optional[Properties] = None,
     ) -> Table:
 
@@ -68,8 +71,7 @@ class InMemoryCatalog(Catalog):
             if namespace not in self.__namespaces:
                 self.__namespaces[namespace] = {}
 
-            table = Table()
-            table.identifier = identifier
+            table = Table(identifier=identifier, 
metadata=EXAMPLE_TABLE_METADATA_V1)
             self.__tables[identifier] = table
             return table
 
@@ -102,9 +104,8 @@ class InMemoryCatalog(Catalog):
         if to_namespace not in self.__namespaces:
             self.__namespaces[to_namespace] = {}
 
-        table.identifier = to_identifier
-        self.__tables[to_identifier] = table
-        return table
+        self.__tables[to_identifier] = Table(identifier=to_identifier, 
metadata=table.metadata)
+        return self.__tables[to_identifier]
 
     def create_namespace(self, namespace: Union[str, Identifier], properties: 
Optional[Properties] = None) -> None:
         namespace = Catalog.identifier_to_tuple(namespace)
@@ -143,18 +144,30 @@ class InMemoryCatalog(Catalog):
 
     def update_namespace_properties(
         self, namespace: Union[str, Identifier], removals: Optional[Set[str]] 
= None, updates: Optional[Properties] = None
-    ) -> None:
+    ) -> PropertiesUpdateSummary:
+        removed: Set[str] = set()
+        updated: Set[str] = set()
+
         namespace = Catalog.identifier_to_tuple(namespace)
         if namespace in self.__namespaces:
             if removals:
                 for key in removals:
                     if key in self.__namespaces[namespace]:
                         del self.__namespaces[namespace][key]
+                        removed.add(key)
             if updates:
-                self.__namespaces[namespace].update(updates)
+                for key, value in updates.items():
+                    self.__namespaces[namespace][key] = value
+                    updated.add(key)
         else:
             raise NoSuchNamespaceError(f"Namespace does not exist: 
{namespace}")
 
+        expected_to_change = removed.difference(removals or set())
+
+        return PropertiesUpdateSummary(
+            removed=list(removed or []), updated=list(updates.keys() if 
updates else []), missing=list(expected_to_change)
+        )
+
 
 TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
 TEST_TABLE_NAMESPACE = ("com", "organization", "department")
@@ -383,11 +396,12 @@ def test_update_namespace_metadata(catalog: 
InMemoryCatalog):
 
     # When
     new_metadata = {"key3": "value3", "key4": "value4"}
-    catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
updates=new_metadata)
+    summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
updates=new_metadata)
 
     # Then
     assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
     assert new_metadata.items() <= 
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
+    assert summary == PropertiesUpdateSummary(removed=[], updated=["key3", 
"key4"], missing=[])
 
 
 def test_update_namespace_metadata_removals(catalog: InMemoryCatalog):
@@ -397,12 +411,13 @@ def test_update_namespace_metadata_removals(catalog: 
InMemoryCatalog):
     # When
     new_metadata = {"key3": "value3", "key4": "value4"}
     remove_metadata = {"key1"}
-    catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, remove_metadata, 
new_metadata)
+    summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
remove_metadata, new_metadata)
 
     # Then
     assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
     assert new_metadata.items() <= 
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
     assert 
remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
+    assert summary == PropertiesUpdateSummary(removed=["key1"], 
updated=["key3", "key4"], missing=[])
 
 
 def 
test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog:
 InMemoryCatalog):
diff --git a/python/tests/catalog/test_rest.py 
b/python/tests/catalog/test_rest.py
new file mode 100644
index 000000000..d5ccfe5b5
--- /dev/null
+++ b/python/tests/catalog/test_rest.py
@@ -0,0 +1,612 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+# pylint: disable=redefined-outer-name
+from uuid import UUID
+
+import pytest
+from requests_mock import Mocker
+
+from pyiceberg.catalog.base import PropertiesUpdateSummary, Table
+from pyiceberg.catalog.rest import RestCatalog
+from pyiceberg.exceptions import (
+    AlreadyExistsError,
+    BadCredentialsError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+from pyiceberg.schema import Schema
+from pyiceberg.table.metadata import TableMetadataV1
+from pyiceberg.table.partitioning import PartitionField, PartitionSpec
+from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
+from pyiceberg.table.snapshots import Operation, Snapshot, Summary
+from pyiceberg.table.sorting import SortField, SortOrder
+from pyiceberg.transforms import IdentityTransform, TruncateTransform
+from pyiceberg.types import (
+    BooleanType,
+    IntegerType,
+    NestedField,
+    StringType,
+)
+
+TEST_HOST = "https://iceberg-test-catalog/";
+TEST_CLIENT_ID = "client"
+TEST_CLIENT_SECRET = "secret"
+TEST_TOKEN = "some_jwt_token"
+
+
[email protected]
+def rest_mock(requests_mock: Mocker):
+    """Takes the default requests_mock and adds the config endpoint to it
+
+    This endpoint is called when initializing the rest catalog
+    """
+    requests_mock.get(
+        f"{TEST_HOST}v1/config",
+        json={"defaults": {}, "overrides": {}},
+        status_code=200,
+    )
+    return requests_mock
+
+
+def test_token_200(rest_mock: Mocker):
+    rest_mock.post(
+        f"{TEST_HOST}v1/oauth/tokens",
+        json={
+            "access_token": TEST_TOKEN,
+            "token_type": "Bearer",
+            "expires_in": 86400,
+            "issued_token_type": 
"urn:ietf:params:oauth:token-type:access_token",
+        },
+        status_code=200,
+    )
+    assert RestCatalog("rest", {}, TEST_HOST, TEST_CLIENT_ID, 
TEST_CLIENT_SECRET).token == TEST_TOKEN
+
+
+def test_token_401(rest_mock: Mocker):
+    message = "Invalid client ID: abc"
+    rest_mock.post(
+        f"{TEST_HOST}v1/oauth/tokens",
+        json={
+            "error": {
+                "message": message,
+                "type": "BadCredentialsException",
+                "code": 401,
+            }
+        },
+        status_code=401,
+    )
+
+    with pytest.raises(BadCredentialsError) as e:
+        RestCatalog("rest", {}, TEST_HOST, client_id=TEST_CLIENT_ID, 
client_secret=TEST_CLIENT_SECRET)
+    assert message in str(e.value)
+
+
+def test_list_tables_200(rest_mock: Mocker):
+    namespace = "examples"
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/{namespace}/tables",
+        json={"identifiers": [{"namespace": ["examples"], "name": 
"fooshare"}]},
+        status_code=200,
+    )
+
+    assert RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")]
+
+
+def test_list_tables_404(rest_mock: Mocker):
+    namespace = "examples"
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/{namespace}/tables",
+        json={
+            "error": {
+                "message": "Namespace does not exist: personal in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+    with pytest.raises(NoSuchNamespaceError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).list_tables(namespace)
+    assert "Namespace does not exist" in str(e.value)
+
+
+def test_list_namespaces_200(rest_mock: Mocker):
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces",
+        json={"namespaces": [["default"], ["examples"], ["fokko"], 
["system"]]},
+        status_code=200,
+    )
+    assert RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).list_namespaces() == [
+        ("default",),
+        ("examples",),
+        ("fokko",),
+        ("system",),
+    ]
+
+
+def test_create_namespace_200(rest_mock: Mocker):
+    namespace = "leden"
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces",
+        json={"namespace": [namespace], "properties": {}},
+        status_code=200,
+    )
+    RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).create_namespace(namespace)
+
+
+def test_create_namespace_409(rest_mock: Mocker):
+    namespace = "examples"
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces",
+        json={
+            "error": {
+                "message": "Namespace already exists: fokko in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "AlreadyExistsException",
+                "code": 409,
+            }
+        },
+        status_code=409,
+    )
+    with pytest.raises(AlreadyExistsError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).create_namespace(namespace)
+    assert "Namespace already exists" in str(e.value)
+
+
+def test_drop_namespace_404(rest_mock: Mocker):
+    namespace = "examples"
+    rest_mock.delete(
+        f"{TEST_HOST}v1/namespaces/{namespace}",
+        json={
+            "error": {
+                "message": "Namespace does not exist: leden in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+    with pytest.raises(NoSuchNamespaceError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).drop_namespace(namespace)
+    assert "Namespace does not exist" in str(e.value)
+
+
+def test_load_namespace_properties_200(rest_mock: Mocker):
+    namespace = "leden"
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/{namespace}",
+        json={"namespace": ["fokko"], "properties": {"prop": "yes"}},
+        status_code=204,
+    )
+    assert RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).load_namespace_properties(namespace) == {"prop": "yes"}
+
+
+def test_load_namespace_properties_404(rest_mock: Mocker):
+    namespace = "leden"
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/{namespace}",
+        json={
+            "error": {
+                "message": "Namespace does not exist: fokko22 in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+    with pytest.raises(NoSuchNamespaceError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).load_namespace_properties(namespace)
+    assert "Namespace does not exist" in str(e.value)
+
+
+def test_update_namespace_properties_200(rest_mock: Mocker):
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces/fokko/properties",
+        json={"removed": [], "updated": ["prop"], "missing": ["abc"]},
+        status_code=200,
+    )
+    response = RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).update_namespace_properties(
+        ("fokko",), {"abc"}, {"prop": "yes"}
+    )
+
+    assert response == PropertiesUpdateSummary(removed=[], updated=["prop"], 
missing=["abc"])
+
+
+def test_update_namespace_properties_404(rest_mock: Mocker):
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces/fokko/properties",
+        json={
+            "error": {
+                "message": "Namespace does not exist: does_not_exists in 
warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+    with pytest.raises(NoSuchNamespaceError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).update_namespace_properties(("fokko",), {"abc"}, {"prop": 
"yes"})
+    assert "Namespace does not exist" in str(e.value)
+
+
+def test_load_table_200(rest_mock: Mocker):
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/fokko/tables/table",
+        json={
+            "metadataLocation": 
"s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
+            "metadata": {
+                "format-version": 1,
+                "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
+                "location": 
"s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f",
+                "last-updated-ms": 1646787054459,
+                "last-column-id": 2,
+                "schema": {
+                    "type": "struct",
+                    "schema-id": 0,
+                    "fields": [
+                        {"id": 1, "name": "id", "required": False, "type": 
"int"},
+                        {"id": 2, "name": "data", "required": False, "type": 
"string"},
+                    ],
+                },
+                "current-schema-id": 0,
+                "schemas": [
+                    {
+                        "type": "struct",
+                        "schema-id": 0,
+                        "fields": [
+                            {"id": 1, "name": "id", "required": False, "type": 
"int"},
+                            {"id": 2, "name": "data", "required": False, 
"type": "string"},
+                        ],
+                    }
+                ],
+                "partition-spec": [],
+                "default-spec-id": 0,
+                "partition-specs": [{"spec-id": 0, "fields": []}],
+                "last-partition-id": 999,
+                "default-sort-order-id": 0,
+                "sort-orders": [{"order-id": 0, "fields": []}],
+                "properties": {"owner": "bryan", 
"write.metadata.compression-codec": "gzip"},
+                "current-snapshot-id": 3497810964824022504,
+                "refs": {"main": {"snapshot-id": 3497810964824022504, "type": 
"branch"}},
+                "snapshots": [
+                    {
+                        "snapshot-id": 3497810964824022504,
+                        "timestamp-ms": 1646787054459,
+                        "summary": {
+                            "operation": "append",
+                            "spark.app.id": "local-1646787004168",
+                            "added-data-files": "1",
+                            "added-records": "1",
+                            "added-files-size": "697",
+                            "changed-partition-count": "1",
+                            "total-records": "1",
+                            "total-files-size": "697",
+                            "total-data-files": "1",
+                            "total-delete-files": "0",
+                            "total-position-deletes": "0",
+                            "total-equality-deletes": "0",
+                        },
+                        "manifest-list": 
"s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+                        "schema-id": 0,
+                    }
+                ],
+                "snapshot-log": [{"timestamp-ms": 1646787054459, 
"snapshot-id": 3497810964824022504}],
+                "metadata-log": [
+                    {
+                        "timestamp-ms": 1646787031514,
+                        "metadata-file": 
"s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
+                    }
+                ],
+            },
+            "config": {"client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"},
+        },
+        status_code=200,
+    )
+    table = RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).load_table(("fokko", "table"))
+    assert table == Table(
+        identifier=("rest", "fokko", "table"),
+        metadata_location=None,
+        metadata=TableMetadataV1(
+            
location="s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f",
+            table_uuid=UUID("b55d9dda-6561-423a-8bfc-787980ce421f"),
+            last_updated_ms=1646787054459,
+            last_column_id=2,
+            schemas=[
+                Schema(
+                    NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=False),
+                    NestedField(field_id=2, name="data", 
field_type=StringType(), required=False),
+                    schema_id=0,
+                    identifier_field_ids=[],
+                )
+            ],
+            current_schema_id=0,
+            partition_specs=[PartitionSpec(spec_id=0, fields=())],
+            default_spec_id=0,
+            last_partition_id=999,
+            properties={"owner": "bryan", "write.metadata.compression-codec": 
"gzip"},
+            current_snapshot_id=3497810964824022504,
+            snapshots=[
+                Snapshot(
+                    snapshot_id=3497810964824022504,
+                    parent_snapshot_id=None,
+                    sequence_number=None,
+                    timestamp_ms=1646787054459,
+                    
manifest_list="s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+                    summary=Summary(
+                        operation=Operation.APPEND,
+                        **{  # type: ignore
+                            "spark.app.id": "local-1646787004168",
+                            "added-data-files": "1",
+                            "added-records": "1",
+                            "added-files-size": "697",
+                            "changed-partition-count": "1",
+                            "total-records": "1",
+                            "total-files-size": "697",
+                            "total-data-files": "1",
+                            "total-delete-files": "0",
+                            "total-position-deletes": "0",
+                            "total-equality-deletes": "0",
+                        },
+                    ),
+                    schema_id=0,
+                )
+            ],
+            snapshot_log=[{"timestamp-ms": 1646787054459, "snapshot-id": 
3497810964824022504}],
+            metadata_log=[
+                {
+                    "timestamp-ms": 1646787031514,
+                    "metadata-file": 
"s3://tabular-public-us-west-2-dev/bb30733e-8769-4dab-aa1b-e76245bb2bd4/b55d9dda-6561-423a-8bfc-787980ce421f/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
+                }
+            ],
+            sort_orders=[SortOrder(order_id=0)],
+            default_sort_order_id=0,
+            refs={
+                "main": SnapshotRef(
+                    snapshot_id=3497810964824022504,
+                    snapshot_ref_type=SnapshotRefType.BRANCH,
+                    min_snapshots_to_keep=None,
+                    max_snapshot_age_ms=None,
+                    max_ref_age_ms=None,
+                )
+            },
+            format_version=1,
+            schema_=Schema(
+                NestedField(field_id=1, name="id", field_type=IntegerType(), 
required=False),
+                NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+                schema_id=0,
+                identifier_field_ids=[],
+            ),
+            partition_spec=[],
+        ),
+        config={"client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"},
+    )
+
+
+def test_load_table_404(rest_mock: Mocker):
+    rest_mock.get(
+        f"{TEST_HOST}v1/namespaces/fokko/tables/does_not_exists",
+        json={
+            "error": {
+                "message": "Table does not exist: examples.does_not_exists in 
warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceErrorException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+
+    with pytest.raises(NoSuchTableError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).load_table(("fokko", "does_not_exists"))
+    assert "Table does not exist" in str(e.value)
+
+
+def test_drop_table_404(rest_mock: Mocker):
+    rest_mock.delete(
+        f"{TEST_HOST}v1/namespaces/fokko/tables/does_not_exists",
+        json={
+            "error": {
+                "message": "Table does not exist: fokko.does_not_exists in 
warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceErrorException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+
+    with pytest.raises(NoSuchTableError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).drop_table(("fokko", "does_not_exists"))
+    assert "Table does not exist" in str(e.value)
+
+
+def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema):
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces/fokko/tables",
+        json={
+            "metadataLocation": None,
+            "metadata": {
+                "format-version": 1,
+                "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+                "location": 
"s3://tabular-wh-us-west-2-dev/8bcb0838-50fc-472d-9ddb-8feb89ef5f1e/bf289591-dcc0-4234-ad4f-5c3eed811a29",
+                "last-updated-ms": 1657810967051,
+                "last-column-id": 3,
+                "schema": {
+                    "type": "struct",
+                    "schema-id": 0,
+                    "identifier-field-ids": [2],
+                    "fields": [
+                        {"id": 1, "name": "foo", "required": False, "type": 
"string"},
+                        {"id": 2, "name": "bar", "required": True, "type": 
"int"},
+                        {"id": 3, "name": "baz", "required": False, "type": 
"boolean"},
+                    ],
+                },
+                "current-schema-id": 0,
+                "schemas": [
+                    {
+                        "type": "struct",
+                        "schema-id": 0,
+                        "identifier-field-ids": [2],
+                        "fields": [
+                            {"id": 1, "name": "foo", "required": False, 
"type": "string"},
+                            {"id": 2, "name": "bar", "required": True, "type": 
"int"},
+                            {"id": 3, "name": "baz", "required": False, 
"type": "boolean"},
+                        ],
+                    }
+                ],
+                "partition-spec": [],
+                "default-spec-id": 0,
+                "partition-specs": [{"spec-id": 0, "fields": []}],
+                "last-partition-id": 999,
+                "default-sort-order-id": 0,
+                "sort-orders": [{"order-id": 0, "fields": []}],
+                "properties": {
+                    "write.delete.parquet.compression-codec": "zstd",
+                    "write.metadata.compression-codec": "gzip",
+                    "write.summary.partition-limit": "100",
+                    "write.parquet.compression-codec": "zstd",
+                },
+                "current-snapshot-id": -1,
+                "refs": {},
+                "snapshots": [],
+                "snapshot-log": [],
+                "metadata-log": [],
+            },
+            "config": {
+                "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+                "region": "us-west-2",
+            },
+        },
+        status_code=200,
+    )
+    table = RestCatalog("rest", {}, TEST_HOST, token=TEST_TOKEN).create_table(
+        identifier=("fokko", "fokko2"),
+        schema=table_schema_simple,
+        location=None,
+        partition_spec=PartitionSpec(
+            spec_id=1, fields=(PartitionField(source_id=1, field_id=1000, 
transform=TruncateTransform(width=3), name="id"),)
+        ),
+        sort_order=SortOrder(1, SortField(source_id=2, 
transform=IdentityTransform())),
+        properties={"owner": "fokko"},
+    )
+    assert table == Table(
+        identifier=("rest", "fokko", "fokko2"),
+        metadata_location=None,
+        metadata=TableMetadataV1(
+            
location="s3://tabular-wh-us-west-2-dev/8bcb0838-50fc-472d-9ddb-8feb89ef5f1e/bf289591-dcc0-4234-ad4f-5c3eed811a29",
+            table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
+            last_updated_ms=1657810967051,
+            last_column_id=3,
+            schemas=[
+                Schema(
+                    NestedField(field_id=1, name="foo", 
field_type=StringType(), required=False),
+                    NestedField(field_id=2, name="bar", 
field_type=IntegerType(), required=True),
+                    NestedField(field_id=3, name="baz", 
field_type=BooleanType(), required=False),
+                    schema_id=0,
+                    identifier_field_ids=[2],
+                )
+            ],
+            current_schema_id=0,
+            partition_specs=[PartitionSpec(spec_id=0, fields=())],
+            default_spec_id=0,
+            last_partition_id=999,
+            properties={
+                "write.delete.parquet.compression-codec": "zstd",
+                "write.metadata.compression-codec": "gzip",
+                "write.summary.partition-limit": "100",
+                "write.parquet.compression-codec": "zstd",
+            },
+            current_snapshot_id=None,
+            snapshots=[],
+            snapshot_log=[],
+            metadata_log=[],
+            sort_orders=[SortOrder(order_id=0)],
+            default_sort_order_id=0,
+            refs={},
+            format_version=1,
+            schema_=Schema(
+                NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
+                NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+                NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+                schema_id=0,
+                identifier_field_ids=[2],
+            ),
+            partition_spec=[],
+        ),
+    )
+
+
+def test_create_table_409(rest_mock, table_schema_simple: Schema):
+    rest_mock.post(
+        f"{TEST_HOST}v1/namespaces/fokko/tables",
+        json={
+            "error": {
+                "message": "Table already exists: fokko.already_exists in 
warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "AlreadyExistsException",
+                "code": 409,
+            }
+        },
+        status_code=409,
+    )
+
+    with pytest.raises(TableAlreadyExistsError) as e:
+        RestCatalog("rest", {}, TEST_HOST, token=TEST_TOKEN).create_table(
+            identifier=("fokko", "fokko2"),
+            schema=table_schema_simple,
+            location=None,
+            partition_spec=PartitionSpec(
+                spec_id=1,
+                fields=(PartitionField(source_id=1, field_id=1000, 
transform=TruncateTransform(width=3), name="id"),),
+            ),
+            sort_order=SortOrder(1, SortField(source_id=2, 
transform=IdentityTransform())),
+            properties={"owner": "fokko"},
+        )
+    assert "Table already exists" in str(e.value)
+
+
+def test_delete_namespace_204(rest_mock: Mocker):
+    namespace = "example"
+    rest_mock.delete(
+        f"{TEST_HOST}v1/namespaces/{namespace}",
+        json={},
+        status_code=204,
+    )
+    RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).drop_namespace(namespace)
+
+
+def test_delete_table_204(rest_mock: Mocker):
+    rest_mock.delete(
+        f"{TEST_HOST}v1/namespaces/example/tables/fokko",
+        json={},
+        status_code=204,
+    )
+    RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).drop_table(("example", "fokko"))
+
+
+def test_delete_table_404(rest_mock: Mocker):
+    rest_mock.delete(
+        f"{TEST_HOST}v1/namespaces/example/tables/fokko",
+        json={
+            "error": {
+                "message": "Table does not exist: fokko.fokko2 in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+                "type": "NoSuchNamespaceErrorException",
+                "code": 404,
+            }
+        },
+        status_code=404,
+    )
+    with pytest.raises(NoSuchTableError) as e:
+        RestCatalog("rest", {}, TEST_HOST, 
token=TEST_TOKEN).drop_table(("example", "fokko"))
+    assert "Table does not exist" in str(e.value)
diff --git a/python/tests/conftest.py b/python/tests/conftest.py
index 1b004ec8f..7aa929039 100644
--- a/python/tests/conftest.py
+++ b/python/tests/conftest.py
@@ -76,7 +76,7 @@ def table_schema_simple():
         NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
         NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
         schema_id=1,
-        identifier_field_ids=[1],
+        identifier_field_ids=[2],
     )
 
 
diff --git a/python/tests/table/test_metadata.py 
b/python/tests/table/test_metadata.py
index 0a58f201d..9c7352d51 100644
--- a/python/tests/table/test_metadata.py
+++ b/python/tests/table/test_metadata.py
@@ -207,13 +207,13 @@ def test_updating_metadata():
 
 def test_serialize_v1():
     table_metadata = TableMetadataV1(**EXAMPLE_TABLE_METADATA_V1).json()
-    expected = """{"location": "s3://bucket/test/location", "table-uuid": 
"d20125c8-7284-442c-9aea-15fee620737c", "last-updated-ms": 1602638573874, 
"last-column-id": 3, "schemas": [{"fields": [{"id": 1, "name": "x", "type": 
"long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": 
true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": 
true}], "schema-id": 0, "identifier-field-ids": []}], "current-schema-id": 0, 
"partition-specs": [{"spec-id": 0, " [...]
+    expected = """{"location": "s3://bucket/test/location", "table-uuid": 
"d20125c8-7284-442c-9aea-15fee620737c", "last-updated-ms": 1602638573874, 
"last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, 
"name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": 
"long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": 
"long", "required": true}], "schema-id": 0, "identifier-field-ids": []}], 
"current-schema-id": 0, "partition-specs": [...]
     assert table_metadata == expected
 
 
 def test_serialize_v2():
     table_metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2).json()
-    expected = """{"location": "s3://bucket/test/location", "table-uuid": 
"9c12d441-03fe-4693-9a96-a0705ddf69c1", "last-updated-ms": 1602638573590, 
"last-column-id": 3, "schemas": [{"fields": [{"id": 1, "name": "x", "type": 
"long", "required": true}], "schema-id": 0, "identifier-field-ids": []}, 
{"fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, 
"name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, 
"name": "z", "type": "long", "required" [...]
+    expected = """{"location": "s3://bucket/test/location", "table-uuid": 
"9c12d441-03fe-4693-9a96-a0705ddf69c1", "last-updated-ms": 1602638573590, 
"last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, 
"name": "x", "type": "long", "required": true}], "schema-id": 0, 
"identifier-field-ids": []}, {"type": "struct", "fields": [{"id": 1, "name": 
"x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", 
"required": true, "doc": "comment"}, {"id": 3, "na [...]
     assert table_metadata == expected
 
 
@@ -505,6 +505,7 @@ def test_v1_write_metadata_for_v2():
             ],
             "identifier-field-ids": [],
             "schema-id": 0,
+            "type": "struct",
         }
     ]
     assert metadata_v2["partition-specs"] == [
@@ -525,3 +526,54 @@ def test_v1_write_metadata_for_v2():
 def test_v2_ref_creation():
     table_metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
     assert table_metadata.refs == {"main": 
SnapshotRef(snapshot_id=3055729675574597004, 
snapshot_ref_type=SnapshotRefType.BRANCH)}
+
+
+def test_metadata_v1():
+    valid_v1 = {
+        "format-version": 1,
+        "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+        "location": 
"s3://tabular-wh-us-west-2-dev/8bcb0838-50fc-472d-9ddb-8feb89ef5f1e/bf289591-dcc0-4234-ad4f-5c3eed811a29",
+        "last-updated-ms": 1657810967051,
+        "last-column-id": 3,
+        "schema": {
+            "type": "struct",
+            "schema-id": 0,
+            "identifier-field-ids": [2],
+            "fields": [
+                {"id": 1, "name": "foo", "required": False, "type": "string"},
+                {"id": 2, "name": "bar", "required": True, "type": "int"},
+                {"id": 3, "name": "baz", "required": False, "type": "boolean"},
+            ],
+        },
+        "current-schema-id": 0,
+        "schemas": [
+            {
+                "type": "struct",
+                "schema-id": 0,
+                "identifier-field-ids": [2],
+                "fields": [
+                    {"id": 1, "name": "foo", "required": False, "type": 
"string"},
+                    {"id": 2, "name": "bar", "required": True, "type": "int"},
+                    {"id": 3, "name": "baz", "required": False, "type": 
"boolean"},
+                ],
+            }
+        ],
+        "partition-spec": [],
+        "default-spec-id": 0,
+        "partition-specs": [{"spec-id": 0, "fields": []}],
+        "last-partition-id": 999,
+        "default-sort-order-id": 0,
+        "sort-orders": [{"order-id": 0, "fields": []}],
+        "properties": {
+            "write.delete.parquet.compression-codec": "zstd",
+            "write.metadata.compression-codec": "gzip",
+            "write.summary.partition-limit": "100",
+            "write.parquet.compression-codec": "zstd",
+        },
+        "current-snapshot-id": -1,
+        "refs": {},
+        "snapshots": [],
+        "snapshot-log": [],
+        "metadata-log": [],
+    }
+    TableMetadataV1(**valid_v1)
diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py
index 3d61323f0..ad094ab93 100644
--- a/python/tests/test_schema.py
+++ b/python/tests/test_schema.py
@@ -406,13 +406,13 @@ def 
test_build_position_accessors_with_struct(table_schema_nested: Schema):
 
 def test_serialize_schema(table_schema_simple: Schema):
     actual = table_schema_simple.json()
-    expected = """{"fields": [{"id": 1, "name": "foo", "type": "string", 
"required": false}, {"id": 2, "name": "bar", "type": "int", "required": true}, 
{"id": 3, "name": "baz", "type": "boolean", "required": false}], "schema-id": 
1, "identifier-field-ids": [1]}"""
+    expected = """{"type": "struct", "fields": [{"id": 1, "name": "foo", 
"type": "string", "required": false}, {"id": 2, "name": "bar", "type": "int", 
"required": true}, {"id": 3, "name": "baz", "type": "boolean", "required": 
false}], "schema-id": 1, "identifier-field-ids": [2]}"""
     assert actual == expected
 
 
 def test_deserialize_schema(table_schema_simple: Schema):
     actual = Schema.parse_raw(
-        """{"fields": [{"id": 1, "name": "foo", "type": "string", "required": 
false}, {"id": 2, "name": "bar", "type": "int", "required": true}, {"id": 3, 
"name": "baz", "type": "boolean", "required": false}], "schema-id": 1, 
"identifier-field-ids": [1]}"""
+        """{"type": "struct", "fields": [{"id": 1, "name": "foo", "type": 
"string", "required": false}, {"id": 2, "name": "bar", "type": "int", 
"required": true}, {"id": 3, "name": "baz", "type": "boolean", "required": 
false}], "schema-id": 1, "identifier-field-ids": [2]}"""
     )
     expected = table_schema_simple
     assert actual == expected
diff --git a/python/tests/utils/test_bin_packing.py 
b/python/tests/utils/test_bin_packing.py
index 71fd53c02..5447dfe92 100644
--- a/python/tests/utils/test_bin_packing.py
+++ b/python/tests/utils/test_bin_packing.py
@@ -19,7 +19,6 @@ import random
 
 import pytest
 
-from pyiceberg.schema import Schema
 from pyiceberg.utils.bin_packing import PackingIterator
 
 
@@ -82,17 +81,3 @@ def test_bin_packing_lookback(splits, target_weight, 
lookback, largest_bin_first
         return x
 
     assert list(PackingIterator(splits, target_weight, lookback, weight_func, 
largest_bin_first)) == expected_lists
-
-
-def test_serialize_schema(table_schema_simple: Schema):
-    actual = table_schema_simple.json()
-    expected = """{"fields": [{"id": 1, "name": "foo", "type": "string", 
"required": false}, {"id": 2, "name": "bar", "type": "int", "required": true}, 
{"id": 3, "name": "baz", "type": "boolean", "required": false}], "schema-id": 
1, "identifier-field-ids": [1]}"""
-    assert actual == expected
-
-
-def test_deserialize_schema(table_schema_simple: Schema):
-    actual = Schema.parse_raw(
-        """{"fields": [{"id": 1, "name": "foo", "type": "string", "required": 
false}, {"id": 2, "name": "bar", "type": "int", "required": true}, {"id": 3, 
"name": "baz", "type": "boolean", "required": false}], "schema-id": 1, 
"identifier-field-ids": [1]}"""
-    )
-    expected = table_schema_simple
-    assert actual == expected

Reply via email to