This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 06e2b2d  Add Hive integration tests (#207)
06e2b2d is described below

commit 06e2b2df48e9ff383a09550e3e307d290fe39ddc
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jan 17 16:30:43 2024 +0100

    Add Hive integration tests (#207)
    
    * Add Hive for CI
    
    * Add Hive integration tests
    
    * Add missing licenses
    
    * Fix
    
    * Remove Arrow
    
    * Add catalog
    
    * Update test suite
    
    * Whitespace
---
 dev/Dockerfile                                     |  10 +-
 dev/docker-compose-integration.yml                 |  14 +
 dev/hive/Dockerfile                                |  34 ++
 dev/hive/core-site.xml                             |  53 +++
 dev/provision.py                                   | 527 ++++++++++-----------
 dev/spark-defaults.conf                            |  20 +-
 tests/integration/__init__.py                      |  16 +
 .../test_catalogs.py}                              | 145 +++---
 .../test_rest_manifest.py}                         |   0
 .../test_rest_schema.py}                           |   0
 10 files changed, 466 insertions(+), 353 deletions(-)

diff --git a/dev/Dockerfile b/dev/Dockerfile
index 1f001f5..44783d0 100644
--- a/dev/Dockerfile
+++ b/dev/Dockerfile
@@ -38,9 +38,8 @@ WORKDIR ${SPARK_HOME}
 
 ENV SPARK_VERSION=3.4.2
 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12
-ENV ICEBERG_VERSION=1.4.0
-ENV AWS_SDK_VERSION=2.20.18
-ENV PYICEBERG_VERSION=0.4.0
+ENV ICEBERG_VERSION=1.4.2
+ENV PYICEBERG_VERSION=0.5.1
 
 RUN curl --retry 3 -s -C - 
https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
 -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
  && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark 
--strip-components 1 \
@@ -51,8 +50,7 @@ RUN curl -s 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runt
  && mv 
iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar 
/opt/spark/jars
 
 # Download AWS bundle
-RUN curl -s 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
 -Lo iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
- && mv iceberg-aws-bundle-${ICEBERG_VERSION}.jar /opt/spark/jars
+RUN curl -s 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
 -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
 
 COPY spark-defaults.conf /opt/spark/conf
 ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
@@ -62,7 +60,7 @@ RUN chmod u+x /opt/spark/sbin/* && \
 
 RUN pip3 install -q ipython
 
-RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}"
+RUN pip3 install "pyiceberg[s3fs,hive]==${PYICEBERG_VERSION}"
 
 COPY entrypoint.sh .
 COPY provision.py .
diff --git a/dev/docker-compose-integration.yml 
b/dev/docker-compose-integration.yml
index 658bd69..fccdcdc 100644
--- a/dev/docker-compose-integration.yml
+++ b/dev/docker-compose-integration.yml
@@ -25,6 +25,7 @@ services:
       iceberg_net:
     depends_on:
       - rest
+      - hive
       - minio
     volumes:
       - ./warehouse:/home/iceberg/warehouse
@@ -37,6 +38,7 @@ services:
       - 8080:8080
     links:
       - rest:rest
+      - hive:hive
       - minio:minio
   rest:
     image: tabulario/iceberg-rest
@@ -85,5 +87,17 @@ services:
       /usr/bin/mc policy set public minio/warehouse;
       tail -f /dev/null
       "
+  hive:
+    build: hive/
+    container_name: hive
+    hostname: hive
+    networks:
+      iceberg_net:
+    ports:
+      - 9083:9083
+    environment:
+      SERVICE_NAME: "metastore"
+      SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
+
 networks:
   iceberg_net:
diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile
new file mode 100644
index 0000000..ee63393
--- /dev/null
+++ b/dev/hive/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl 
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
 -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
 -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar 
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar 
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
diff --git a/dev/hive/core-site.xml b/dev/hive/core-site.xml
new file mode 100644
index 0000000..b77332b
--- /dev/null
+++ b/dev/hive/core-site.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>s3a://warehouse/hive</value>
+    </property>
+    <property>
+        <name>fs.s3a.impl</name>
+        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+    </property>
+    <property>
+        <name>fs.s3a.fast.upload</name>
+        <value>true</value>
+    </property>
+    <property>
+      <name>fs.s3a.endpoint</name>
+      <value>http://minio:9000</value>
+    </property>
+    <property>
+      <name>fs.s3a.access.key</name>
+      <value>admin</value>
+    </property>
+    <property>
+      <name>fs.s3a.secret.key</name>
+      <value>password</value>
+    </property>
+    <property>
+      <name>fs.s3a.connection.ssl.enabled</name>
+      <value>false</value>
+    </property>
+    <property>
+      <name>fs.s3a.path.style.access</name>
+      <value>true</value>
+    </property>
+</configuration>
diff --git a/dev/provision.py b/dev/provision.py
index 9917cd3..e5048d2 100644
--- a/dev/provision.py
+++ b/dev/provision.py
@@ -24,318 +24,299 @@ from pyiceberg.types import FixedType, NestedField, 
UUIDType
 
 spark = SparkSession.builder.getOrCreate()
 
-spark.sql(
-    """
-  CREATE DATABASE IF NOT EXISTS default;
-"""
-)
-
-schema = Schema(
-    NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), 
required=False),
-    NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), 
required=False),
-)
-
-catalog = load_catalog(
-    "local",
-    **{
-        "type": "rest",
-        "uri": "http://rest:8181";,
-        "s3.endpoint": "http://minio:9000";,
-        "s3.access-key-id": "admin",
-        "s3.secret-access-key": "password",
-    },
-)
-
-catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", 
schema=schema)
-
-spark.sql(
-    """
-    INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
-    ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' 
AS BINARY)),
-    ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' 
AS BINARY)),
-    ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' 
AS BINARY)),
-    ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' 
AS BINARY)),
-    ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' 
AS BINARY));
+catalogs = {
+    'rest': load_catalog(
+        "rest",
+        **{
+            "type": "rest",
+            "uri": "http://rest:8181";,
+            "s3.endpoint": "http://minio:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    ),
+    'hive': load_catalog(
+        "hive",
+        **{
+            "type": "hive",
+            "uri": "http://hive:9083";,
+            "s3.endpoint": "http://minio:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    ),
+}
+
+for catalog_name, catalog in catalogs.items():
+    spark.sql(
+        f"""
+      CREATE DATABASE IF NOT EXISTS {catalog_name}.default;
     """
-)
+    )
 
-spark.sql(
-    """
-  CREATE OR REPLACE TABLE default.test_null_nan
-  USING iceberg
-  AS SELECT
-    1            AS idx,
-    float('NaN') AS col_numeric
-UNION ALL SELECT
-    2            AS idx,
-    null         AS col_numeric
-UNION ALL SELECT
-    3            AS idx,
-    1            AS col_numeric
-"""
-)
-
-spark.sql(
-    """
-  CREATE OR REPLACE TABLE default.test_null_nan_rewritten
-  USING iceberg
-  AS SELECT * FROM default.test_null_nan
-"""
-)
+    schema = Schema(
+        NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), 
required=False),
+        NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), 
required=False),
+    )
 
-spark.sql(
-    """
-CREATE OR REPLACE TABLE default.test_limit as
-  SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx;
-"""
-)
+    
catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", 
schema=schema)
 
-spark.sql(
-    """
-CREATE OR REPLACE TABLE default.test_positional_mor_deletes (
-    dt     date,
-    number integer,
-    letter string
-)
-USING iceberg
-TBLPROPERTIES (
-    'write.delete.mode'='merge-on-read',
-    'write.update.mode'='merge-on-read',
-    'write.merge.mode'='merge-on-read',
-    'format-version'='2'
-);
-"""
-)
-
-# Partitioning is not really needed, but there is a bug:
-# https://github.com/apache/iceberg/pull/7685
-spark.sql(
-    """
-    ALTER TABLE default.test_positional_mor_deletes ADD PARTITION FIELD 
years(dt) AS dt_years
-"""
-)
+    spark.sql(
+        f"""
+        INSERT INTO {catalog_name}.default.test_uuid_and_fixed_unpartitioned 
VALUES
+        ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', 
CAST('1234567890123456789012345' AS BINARY)),
+        ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', 
CAST('1231231231231231231231231' AS BINARY)),
+        ('639cccce-c9d2-494a-a78c-278ab234f024', 
CAST('12345678901234567ass12345' AS BINARY)),
+        ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', 
CAST('asdasasdads12312312312111' AS BINARY)),
+        ('923dae77-83d6-47cd-b4b0-d383e64ee57e', 
CAST('qweeqwwqq1231231231231111' AS BINARY));
+        """
+    )
 
-spark.sql(
-    """
-INSERT INTO default.test_positional_mor_deletes
-VALUES
-    (CAST('2023-03-01' AS date), 1, 'a'),
-    (CAST('2023-03-02' AS date), 2, 'b'),
-    (CAST('2023-03-03' AS date), 3, 'c'),
-    (CAST('2023-03-04' AS date), 4, 'd'),
-    (CAST('2023-03-05' AS date), 5, 'e'),
-    (CAST('2023-03-06' AS date), 6, 'f'),
-    (CAST('2023-03-07' AS date), 7, 'g'),
-    (CAST('2023-03-08' AS date), 8, 'h'),
-    (CAST('2023-03-09' AS date), 9, 'i'),
-    (CAST('2023-03-10' AS date), 10, 'j'),
-    (CAST('2023-03-11' AS date), 11, 'k'),
-    (CAST('2023-03-12' AS date), 12, 'l');
-"""
-)
-
-spark.sql(
-    """
-ALTER TABLE default.test_positional_mor_deletes CREATE TAG tag_12
+    spark.sql(
+        f"""
+      CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan
+      USING iceberg
+      AS SELECT
+        1            AS idx,
+        float('NaN') AS col_numeric
+    UNION ALL SELECT
+        2            AS idx,
+        null         AS col_numeric
+    UNION ALL SELECT
+        3            AS idx,
+        1            AS col_numeric
     """
-)
+    )
 
-spark.sql(
-    """
-ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH without_5
+    spark.sql(
+        f"""
+      CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan_rewritten
+      USING iceberg
+      AS SELECT * FROM default.test_null_nan
     """
-)
+    )
 
-spark.sql(
-    """
-DELETE FROM default.test_positional_mor_deletes.branch_without_5 WHERE number 
= 5
+    spark.sql(
+        f"""
+    CREATE OR REPLACE TABLE {catalog_name}.default.test_limit as
+      SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS 
idx;
     """
-)
+    )
 
-spark.sql(
+    spark.sql(
+        f"""
+    CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_deletes 
(
+        dt     date,
+        number integer,
+        letter string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'write.delete.mode'='merge-on-read',
+        'write.update.mode'='merge-on-read',
+        'write.merge.mode'='merge-on-read',
+        'format-version'='2'
+    );
     """
-DELETE FROM default.test_positional_mor_deletes WHERE number = 9
-"""
-)
+    )
 
-spark.sql(
-    """
-  CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes (
-    dt     date,
-    number integer,
-    letter string
-  )
-  USING iceberg
-  TBLPROPERTIES (
-    'write.delete.mode'='merge-on-read',
-    'write.update.mode'='merge-on-read',
-    'write.merge.mode'='merge-on-read',
-    'format-version'='2'
-  );
-"""
-)
-
-# Partitioning is not really needed, but there is a bug:
-# https://github.com/apache/iceberg/pull/7685
-spark.sql(
-    """
-    ALTER TABLE default.test_positional_mor_double_deletes ADD PARTITION FIELD 
years(dt) AS dt_years
-"""
-)
+    # Partitioning is not really needed, but there is a bug:
+    # https://github.com/apache/iceberg/pull/7685
+    spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes 
ADD PARTITION FIELD years(dt) AS dt_years")
 
-spark.sql(
-    """
-INSERT INTO default.test_positional_mor_double_deletes
-VALUES
-    (CAST('2023-03-01' AS date), 1, 'a'),
-    (CAST('2023-03-02' AS date), 2, 'b'),
-    (CAST('2023-03-03' AS date), 3, 'c'),
-    (CAST('2023-03-04' AS date), 4, 'd'),
-    (CAST('2023-03-05' AS date), 5, 'e'),
-    (CAST('2023-03-06' AS date), 6, 'f'),
-    (CAST('2023-03-07' AS date), 7, 'g'),
-    (CAST('2023-03-08' AS date), 8, 'h'),
-    (CAST('2023-03-09' AS date), 9, 'i'),
-    (CAST('2023-03-10' AS date), 10, 'j'),
-    (CAST('2023-03-11' AS date), 11, 'k'),
-    (CAST('2023-03-12' AS date), 12, 'l');
-"""
-)
-
-spark.sql(
+    spark.sql(
+        f"""
+    INSERT INTO {catalog_name}.default.test_positional_mor_deletes
+    VALUES
+        (CAST('2023-03-01' AS date), 1, 'a'),
+        (CAST('2023-03-02' AS date), 2, 'b'),
+        (CAST('2023-03-03' AS date), 3, 'c'),
+        (CAST('2023-03-04' AS date), 4, 'd'),
+        (CAST('2023-03-05' AS date), 5, 'e'),
+        (CAST('2023-03-06' AS date), 6, 'f'),
+        (CAST('2023-03-07' AS date), 7, 'g'),
+        (CAST('2023-03-08' AS date), 8, 'h'),
+        (CAST('2023-03-09' AS date), 9, 'i'),
+        (CAST('2023-03-10' AS date), 10, 'j'),
+        (CAST('2023-03-11' AS date), 11, 'k'),
+        (CAST('2023-03-12' AS date), 12, 'l');
     """
-    DELETE FROM default.test_positional_mor_double_deletes WHERE number = 9
-"""
-)
+    )
+
+    spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes 
CREATE TAG tag_12")
+
+    spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes 
CREATE BRANCH without_5")
+
+    spark.sql(f"DELETE FROM 
{catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE 
number = 5")
+
+    spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes 
WHERE number = 9")
 
-spark.sql(
-    """
-    DELETE FROM default.test_positional_mor_double_deletes WHERE letter == 'f'
-"""
-)
-
-all_types_dataframe = (
-    spark.range(0, 5, 1, 5)
-    .withColumnRenamed("id", "longCol")
-    .withColumn("intCol", expr("CAST(longCol AS INT)"))
-    .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
-    .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
-    .withColumn("dateCol", date_add(current_date(), 1))
-    .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
-    .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
-    .withColumn("booleanCol", expr("longCol > 5"))
-    .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
-    .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
-    .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
-    .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
-    .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
-    .withColumn("arrayCol", expr("ARRAY(longCol)"))
-    .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
-)
-
-all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version",
 "2").partitionedBy(
-    "intCol"
-).createOrReplace()
-
-for table_name, partition in [
-    ("test_partitioned_by_identity", "ts"),
-    ("test_partitioned_by_years", "years(dt)"),
-    ("test_partitioned_by_months", "months(dt)"),
-    ("test_partitioned_by_days", "days(ts)"),
-    ("test_partitioned_by_hours", "hours(ts)"),
-    ("test_partitioned_by_truncate", "truncate(1, letter)"),
-    ("test_partitioned_by_bucket", "bucket(16, number)"),
-]:
     spark.sql(
         f"""
-      CREATE OR REPLACE TABLE default.{table_name} (
+      CREATE OR REPLACE TABLE 
{catalog_name}.default.test_positional_mor_double_deletes (
         dt     date,
-        ts     timestamp,
         number integer,
         letter string
       )
-      USING iceberg;
+      USING iceberg
+      TBLPROPERTIES (
+        'write.delete.mode'='merge-on-read',
+        'write.update.mode'='merge-on-read',
+        'write.merge.mode'='merge-on-read',
+        'format-version'='2'
+      );
     """
     )
 
-    spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD 
{partition}")
+    # Partitioning is not really needed, but there is a bug:
+    # https://github.com/apache/iceberg/pull/7685
+    spark.sql(f"ALTER TABLE 
{catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD 
years(dt) AS dt_years")
 
     spark.sql(
         f"""
-    INSERT INTO default.{table_name}
+    INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes
     VALUES
-        (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 
1, 'a'),
-        (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 
2, 'b'),
-        (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 
3, 'c'),
-        (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 
4, 'd'),
-        (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 
5, 'e'),
-        (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 
6, 'f'),
-        (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 
7, 'g'),
-        (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 
8, 'h'),
-        (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 
9, 'i'),
-        (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 
10, 'j'),
-        (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 
11, 'k'),
-        (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 
12, 'l');
+        (CAST('2023-03-01' AS date), 1, 'a'),
+        (CAST('2023-03-02' AS date), 2, 'b'),
+        (CAST('2023-03-03' AS date), 3, 'c'),
+        (CAST('2023-03-04' AS date), 4, 'd'),
+        (CAST('2023-03-05' AS date), 5, 'e'),
+        (CAST('2023-03-06' AS date), 6, 'f'),
+        (CAST('2023-03-07' AS date), 7, 'g'),
+        (CAST('2023-03-08' AS date), 8, 'h'),
+        (CAST('2023-03-09' AS date), 9, 'i'),
+        (CAST('2023-03-10' AS date), 10, 'j'),
+        (CAST('2023-03-11' AS date), 11, 'k'),
+        (CAST('2023-03-12' AS date), 12, 'l');
     """
     )
 
-# There is an issue with CREATE OR REPLACE
-# https://github.com/apache/iceberg/issues/8756
-spark.sql(
+    spark.sql(f"DELETE FROM 
{catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9")
+
+    spark.sql(f"DELETE FROM 
{catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'")
+
+    all_types_dataframe = (
+        spark.range(0, 5, 1, 5)
+        .withColumnRenamed("id", "longCol")
+        .withColumn("intCol", expr("CAST(longCol AS INT)"))
+        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        .withColumn("dateCol", date_add(current_date(), 1))
+        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+        .withColumn("booleanCol", expr("longCol > 5"))
+        .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
+        .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
+        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
+        .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
+        .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
+        .withColumn("arrayCol", expr("ARRAY(longCol)"))
+        .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
+    )
+
+    
all_types_dataframe.writeTo(f"{catalog_name}.default.test_all_types").tableProperty("format-version",
 "2").partitionedBy(
+        "intCol"
+    ).createOrReplace()
+
+    for table_name, partition in [
+        ("test_partitioned_by_identity", "ts"),
+        ("test_partitioned_by_years", "years(dt)"),
+        ("test_partitioned_by_months", "months(dt)"),
+        ("test_partitioned_by_days", "days(ts)"),
+        ("test_partitioned_by_hours", "hours(ts)"),
+        ("test_partitioned_by_truncate", "truncate(1, letter)"),
+        ("test_partitioned_by_bucket", "bucket(16, number)"),
+    ]:
+        spark.sql(
+            f"""
+          CREATE OR REPLACE TABLE {catalog_name}.default.{table_name} (
+            dt     date,
+            ts     timestamp,
+            number integer,
+            letter string
+          )
+          USING iceberg;
+        """
+        )
+
+        spark.sql(f"ALTER TABLE {catalog_name}.default.{table_name} ADD 
PARTITION FIELD {partition}")
+
+        spark.sql(
+            f"""
+        INSERT INTO {catalog_name}.default.{table_name}
+        VALUES
+            (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS 
timestamp), 1, 'a'),
+            (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS 
timestamp), 2, 'b'),
+            (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS 
timestamp), 3, 'c'),
+            (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS 
timestamp), 4, 'd'),
+            (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS 
timestamp), 5, 'e'),
+            (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS 
timestamp), 6, 'f'),
+            (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS 
timestamp), 7, 'g'),
+            (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS 
timestamp), 8, 'h'),
+            (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS 
timestamp), 9, 'i'),
+            (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS 
timestamp), 10, 'j'),
+            (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS 
timestamp), 11, 'k'),
+            (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS 
timestamp), 12, 'l');
+        """
+        )
+
+    # There is an issue with CREATE OR REPLACE
+    # https://github.com/apache/iceberg/issues/8756
+    spark.sql(f"DROP TABLE IF EXISTS 
{catalog_name}.default.test_table_version")
+
+    spark.sql(
+        f"""
+    CREATE TABLE {catalog_name}.default.test_table_version (
+        dt     date,
+        number integer,
+        letter string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'format-version'='1'
+    );
     """
-DROP TABLE IF EXISTS default.test_table_version
-"""
-)
+    )
 
-spark.sql(
+    spark.sql(
+        f"""
+    CREATE TABLE {catalog_name}.default.test_table_sanitized_character (
+        `letter/abc` string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'format-version'='1'
+    );
     """
-CREATE TABLE default.test_table_version (
-    dt     date,
-    number integer,
-    letter string
-)
-USING iceberg
-TBLPROPERTIES (
-    'format-version'='1'
-);
-"""
-)
-
-spark.sql(
+    )
+
+    spark.sql(
+        f"""
+    INSERT INTO {catalog_name}.default.test_table_sanitized_character
+    VALUES
+        ('123')
     """
-CREATE TABLE default.test_table_sanitized_character (
-    `letter/abc` string
-)
-USING iceberg
-TBLPROPERTIES (
-    'format-version'='1'
-);
-"""
-)
-
-spark.sql(
+    )
+
+    spark.sql(
+        f"""
+    INSERT INTO {catalog_name}.default.test_table_sanitized_character
+    VALUES
+        ('123')
     """
-INSERT INTO default.test_table_sanitized_character
-VALUES
-    ('123')
-"""
-)
+    )
 
-spark.sql(
+    spark.sql(
+        f"""
+    CREATE TABLE {catalog_name}.default.test_table_add_column (
+        a string
+    )
+    USING iceberg
     """
-CREATE TABLE default.test_table_add_column (
-    a string
-)
-USING iceberg
-"""
-)
+    )
 
-spark.sql("INSERT INTO default.test_table_add_column VALUES ('1')")
+    spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column 
VALUES ('1')")
 
-spark.sql(
-    """
-ALTER TABLE default.test_table_add_column ADD COLUMN b string
-"""
-)
+    spark.sql(f"ALTER TABLE {catalog_name}.default.test_table_add_column ADD 
COLUMN b string")
 
-spark.sql("INSERT INTO default.test_table_add_column VALUES ('2', '2')")
+    spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column 
VALUES ('2', '2')")
diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf
index 56c3454..2316336 100644
--- a/dev/spark-defaults.conf
+++ b/dev/spark-defaults.conf
@@ -16,13 +16,19 @@
 #
 
 spark.sql.extensions                   
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
-spark.sql.catalog.demo                 org.apache.iceberg.spark.SparkCatalog
-spark.sql.catalog.demo.type            rest
-spark.sql.catalog.demo.uri             http://rest:8181
-spark.sql.catalog.demo.io-impl         org.apache.iceberg.aws.s3.S3FileIO
-spark.sql.catalog.demo.warehouse       s3://warehouse/wh/
-spark.sql.catalog.demo.s3.endpoint     http://minio:9000
-spark.sql.defaultCatalog               demo
+spark.sql.catalog.rest                 org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.rest.type            rest
+spark.sql.catalog.rest.uri             http://rest:8181
+spark.sql.catalog.rest.io-impl         org.apache.iceberg.aws.s3.S3FileIO
+spark.sql.catalog.rest.warehouse       s3://warehouse/rest/
+spark.sql.catalog.rest.s3.endpoint     http://minio:9000
+spark.sql.catalog.hive                 org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.hive.type            hive
+spark.sql.catalog.hive.uri             http://hive:9083
+spark.sql.catalog.hive.io-impl         org.apache.iceberg.aws.s3.S3FileIO
+spark.sql.catalog.hive.warehouse       s3://warehouse/hive/
+spark.sql.catalog.hive.s3.endpoint     http://minio:9000
+spark.sql.defaultCatalog               rest
 spark.eventLog.enabled                 true
 spark.eventLog.dir                     /home/iceberg/spark-events
 spark.history.fs.logDirectory          /home/iceberg/spark-events
diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/integration/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/test_integration.py b/tests/integration/test_catalogs.py
similarity index 70%
rename from tests/test_integration.py
rename to tests/integration/test_catalogs.py
index 2a173be..3fbdb69 100644
--- a/tests/test_integration.py
+++ b/tests/integration/test_catalogs.py
@@ -25,6 +25,7 @@ import pytest
 from pyarrow.fs import S3FileSystem
 
 from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog.hive import HiveCatalog
 from pyiceberg.exceptions import NoSuchTableError
 from pyiceberg.expressions import (
     And,
@@ -50,7 +51,7 @@ DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 
'zstd'}
 
 
 @pytest.fixture()
-def catalog() -> Catalog:
+def catalog_rest() -> Catalog:
     return load_catalog(
         "local",
         **{
@@ -64,40 +65,23 @@ def catalog() -> Catalog:
 
 
 @pytest.fixture()
-def table_test_null_nan(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_null_nan")
-
-
[email protected]()
-def table_test_null_nan_rewritten(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_null_nan_rewritten")
-
-
[email protected]()
-def table_test_limit(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_limit")
-
-
[email protected]()
-def table_test_all_types(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_all_types")
-
-
[email protected]()
-def table_test_table_version(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_table_version")
-
-
[email protected]()
-def table_test_table_sanitized_character(catalog: Catalog) -> Table:
-    return catalog.load_table("default.test_table_sanitized_character")
+def catalog_hive() -> Catalog:
+    return load_catalog(
+        "local",
+        **{
+            "type": "hive",
+            "uri": "http://localhost:9083";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    )
 
 
 TABLE_NAME = ("default", "t1")
 
 
[email protected]()
-def table(catalog: Catalog) -> Table:
+def create_table(catalog: Catalog) -> Table:
     try:
         catalog.drop_table(TABLE_NAME)
     except NoSuchTableError:
@@ -115,7 +99,12 @@ def table(catalog: Catalog) -> Table:
 
 
 @pytest.mark.integration
-def test_table_properties(table: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_table_properties(catalog: Catalog) -> None:
+    if isinstance(catalog, HiveCatalog):
+        pytest.skip("Not yet implemented: 
https://github.com/apache/iceberg-python/issues/275";)
+    table = create_table(catalog)
+
     assert table.properties == DEFAULT_PROPERTIES
 
     with table.transaction() as transaction:
@@ -137,26 +126,10 @@ def test_table_properties(table: Table) -> None:
     assert table.properties == DEFAULT_PROPERTIES
 
 
[email protected]()
-def test_positional_mor_deletes(catalog: Catalog) -> Table:
-    """Table that has positional deletes"""
-    return catalog.load_table("default.test_positional_mor_deletes")
-
-
[email protected]()
-def test_table_add_column(catalog: Catalog) -> Table:
-    """Table that has a new column"""
-    return catalog.load_table("default.test_table_add_column")
-
-
[email protected]()
-def test_positional_mor_double_deletes(catalog: Catalog) -> Table:
-    """Table that has multiple positional deletes"""
-    return catalog.load_table("default.test_positional_mor_double_deletes")
-
-
 @pytest.mark.integration
-def test_pyarrow_nan(table_test_null_nan: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_nan(catalog: Catalog) -> None:
+    table_test_null_nan = catalog.load_table("default.test_null_nan")
     arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), 
selected_fields=("idx", "col_numeric")).to_arrow()
     assert len(arrow_table) == 1
     assert arrow_table["idx"][0].as_py() == 1
@@ -164,7 +137,9 @@ def test_pyarrow_nan(table_test_null_nan: Table) -> None:
 
 
 @pytest.mark.integration
-def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_nan_rewritten(catalog: Catalog) -> None:
+    table_test_null_nan_rewritten = 
catalog.load_table("default.test_null_nan_rewritten")
     arrow_table = table_test_null_nan_rewritten.scan(
         row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")
     ).to_arrow()
@@ -174,14 +149,18 @@ def 
test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
 
 
 @pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 @pytest.mark.skip(reason="Fixing issues with NaN's: 
https://github.com/apache/arrow/issues/34162";)
-def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None:
+def test_pyarrow_not_nan_count(catalog: Catalog) -> None:
+    table_test_null_nan = catalog.load_table("default.test_null_nan")
     not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), 
selected_fields=("idx",)).to_arrow()
     assert len(not_nan) == 2
 
 
 @pytest.mark.integration
-def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_duckdb_nan(catalog: Catalog) -> None:
+    table_test_null_nan_rewritten = 
catalog.load_table("default.test_null_nan_rewritten")
     con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan")
     result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE 
isnan(col_numeric)").fetchone()
     assert result[0] == 1
@@ -189,7 +168,9 @@ def test_duckdb_nan(table_test_null_nan_rewritten: Table) 
-> None:
 
 
 @pytest.mark.integration
-def test_pyarrow_limit(table_test_limit: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_limit(catalog: Catalog) -> None:
+    table_test_limit = catalog.load_table("default.test_limit")
     limited_result = table_test_limit.scan(selected_fields=("idx",), 
limit=1).to_arrow()
     assert len(limited_result) == 1
 
@@ -200,16 +181,20 @@ def test_pyarrow_limit(table_test_limit: Table) -> None:
     assert len(full_result) == 10
 
 
[email protected]("ignore")
 @pytest.mark.integration
-def test_ray_nan(table_test_null_nan_rewritten: Table) -> None:
[email protected]("ignore")
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_ray_nan(catalog: Catalog) -> None:
+    table_test_null_nan_rewritten = 
catalog.load_table("default.test_null_nan_rewritten")
     ray_dataset = table_test_null_nan_rewritten.scan().to_ray()
     assert ray_dataset.count() == 3
     assert math.isnan(ray_dataset.take()[0]["col_numeric"])
 
 
 @pytest.mark.integration
-def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_ray_nan_rewritten(catalog: Catalog) -> None:
+    table_test_null_nan_rewritten = 
catalog.load_table("default.test_null_nan_rewritten")
     ray_dataset = table_test_null_nan_rewritten.scan(
         row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")
     ).to_ray()
@@ -219,15 +204,19 @@ def test_ray_nan_rewritten(table_test_null_nan_rewritten: 
Table) -> None:
 
 
 @pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 @pytest.mark.skip(reason="Fixing issues with NaN's: 
https://github.com/apache/arrow/issues/34162";)
-def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None:
+def test_ray_not_nan_count(catalog: Catalog) -> None:
+    table_test_null_nan_rewritten = 
catalog.load_table("default.test_null_nan_rewritten")
     ray_dataset = 
table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), 
selected_fields=("idx",)).to_ray()
     print(ray_dataset.take())
     assert ray_dataset.count() == 2
 
 
 @pytest.mark.integration
-def test_ray_all_types(table_test_all_types: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_ray_all_types(catalog: Catalog) -> None:
+    table_test_all_types = catalog.load_table("default.test_all_types")
     ray_dataset = table_test_all_types.scan().to_ray()
     pandas_dataframe = table_test_all_types.scan().to_pandas()
     assert ray_dataset.count() == pandas_dataframe.shape[0]
@@ -235,7 +224,9 @@ def test_ray_all_types(table_test_all_types: Table) -> None:
 
 
 @pytest.mark.integration
-def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
+    table_test_all_types = catalog.load_table("default.test_all_types")
     fs = S3FileSystem(endpoint_override="http://localhost:9000";, 
access_key="admin", secret_key="password")
     data_file_paths = [task.file.file_path for task in 
table_test_all_types.scan().plan_files()]
     for data_file_path in data_file_paths:
@@ -248,7 +239,8 @@ def test_pyarrow_to_iceberg_all_types(table_test_all_types: 
Table) -> None:
 
 
 @pytest.mark.integration
-def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_deletes(catalog: Catalog) -> None:
     # number, letter
     #  (1, 'a'),
     #  (2, 'b'),
@@ -262,6 +254,7 @@ def test_pyarrow_deletes(test_positional_mor_deletes: 
Table) -> None:
     #  (10, 'j'),
     #  (11, 'k'),
     #  (12, 'l')
+    test_positional_mor_deletes = 
catalog.load_table("default.test_positional_mor_deletes")
     arrow_table = test_positional_mor_deletes.scan().to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 
11, 12]
 
@@ -283,7 +276,8 @@ def test_pyarrow_deletes(test_positional_mor_deletes: 
Table) -> None:
 
 
 @pytest.mark.integration
-def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> 
None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_deletes_double(catalog: Catalog) -> None:
     # number, letter
     #  (1, 'a'),
     #  (2, 'b'),
@@ -297,6 +291,7 @@ def 
test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No
     #  (10, 'j'),
     #  (11, 'k'),
     #  (12, 'l')
+    test_positional_mor_double_deletes = 
catalog.load_table("default.test_positional_mor_double_deletes")
     arrow_table = test_positional_mor_double_deletes.scan().to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 
12]
 
@@ -318,6 +313,7 @@ def 
test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No
 
 
 @pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 def test_partitioned_tables(catalog: Catalog) -> None:
     for table_name, predicate in [
         ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"),
@@ -334,6 +330,7 @@ def test_partitioned_tables(catalog: Catalog) -> None:
 
 
 @pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
     unpartitioned_uuid = 
catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
     arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == 
'102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow()
@@ -350,6 +347,7 @@ def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
 
 
 @pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
 def test_unpartitioned_fixed_table(catalog: Catalog) -> None:
     fixed_table = 
catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
     arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", 
b"1234567890123456789012345")).to_arrow()
@@ -368,19 +366,25 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> 
None:
 
 
 @pytest.mark.integration
-def test_scan_tag(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_scan_tag(catalog: Catalog) -> None:
+    test_positional_mor_deletes = 
catalog.load_table("default.test_positional_mor_deletes")
     arrow_table = 
test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 
10, 11, 12]
 
 
 @pytest.mark.integration
-def test_scan_branch(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_scan_branch(catalog: Catalog) -> None:
+    test_positional_mor_deletes = 
catalog.load_table("default.test_positional_mor_deletes")
     arrow_table = 
test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 
11, 12]
 
 
 @pytest.mark.integration
-def test_filter_on_new_column(test_table_add_column: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_filter_on_new_column(catalog: Catalog) -> None:
+    test_table_add_column = catalog.load_table("default.test_table_add_column")
     arrow_table = test_table_add_column.scan(row_filter="b == '2'").to_arrow()
     assert arrow_table["b"].to_pylist() == ['2']
 
@@ -392,7 +396,12 @@ def test_filter_on_new_column(test_table_add_column: 
Table) -> None:
 
 
 @pytest.mark.integration
-def test_upgrade_table_version(table_test_table_version: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_upgrade_table_version(catalog: Catalog) -> None:
+    if isinstance(catalog, HiveCatalog):
+        pytest.skip("Not yet implemented: 
https://github.com/apache/iceberg-python/issues/274";)
+    table_test_table_version = catalog.load_table("default.test_table_version")
+
     assert table_test_table_version.format_version == 1
 
     with table_test_table_version.transaction() as transaction:
@@ -417,7 +426,9 @@ def test_upgrade_table_version(table_test_table_version: 
Table) -> None:
 
 
 @pytest.mark.integration
-def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_sanitize_character(catalog: Catalog) -> None:
+    table_test_table_sanitized_character = 
catalog.load_table("default.test_table_sanitized_character")
     arrow_table = table_test_table_sanitized_character.scan().to_arrow()
     assert len(arrow_table.schema.names), 1
     assert len(table_test_table_sanitized_character.schema().fields), 1
diff --git a/tests/test_integration_manifest.py 
b/tests/integration/test_rest_manifest.py
similarity index 100%
rename from tests/test_integration_manifest.py
rename to tests/integration/test_rest_manifest.py
diff --git a/tests/test_integration_schema.py 
b/tests/integration/test_rest_schema.py
similarity index 100%
rename from tests/test_integration_schema.py
rename to tests/integration/test_rest_schema.py


Reply via email to