This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 06e2b2d Add Hive integration tests (#207)
06e2b2d is described below
commit 06e2b2df48e9ff383a09550e3e307d290fe39ddc
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jan 17 16:30:43 2024 +0100
Add Hive integration tests (#207)
* Add Hive for CI
* Add Hive integration tests
* Add missing licenses
* Fix
* Remove Arrow
* Add catalog
* Update test suite
* Whitespace
---
dev/Dockerfile | 10 +-
dev/docker-compose-integration.yml | 14 +
dev/hive/Dockerfile | 34 ++
dev/hive/core-site.xml | 53 +++
dev/provision.py | 527 ++++++++++-----------
dev/spark-defaults.conf | 20 +-
tests/integration/__init__.py | 16 +
.../test_catalogs.py} | 145 +++---
.../test_rest_manifest.py} | 0
.../test_rest_schema.py} | 0
10 files changed, 466 insertions(+), 353 deletions(-)
diff --git a/dev/Dockerfile b/dev/Dockerfile
index 1f001f5..44783d0 100644
--- a/dev/Dockerfile
+++ b/dev/Dockerfile
@@ -38,9 +38,8 @@ WORKDIR ${SPARK_HOME}
ENV SPARK_VERSION=3.4.2
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12
-ENV ICEBERG_VERSION=1.4.0
-ENV AWS_SDK_VERSION=2.20.18
-ENV PYICEBERG_VERSION=0.4.0
+ENV ICEBERG_VERSION=1.4.2
+ENV PYICEBERG_VERSION=0.5.1
RUN curl --retry 3 -s -C -
https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
-o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark
--strip-components 1 \
@@ -51,8 +50,7 @@ RUN curl -s
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runt
&& mv
iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar
/opt/spark/jars
# Download AWS bundle
-RUN curl -s
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
-Lo iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
- && mv iceberg-aws-bundle-${ICEBERG_VERSION}.jar /opt/spark/jars
+RUN curl -s
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
COPY spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
@@ -62,7 +60,7 @@ RUN chmod u+x /opt/spark/sbin/* && \
RUN pip3 install -q ipython
-RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}"
+RUN pip3 install "pyiceberg[s3fs,hive]==${PYICEBERG_VERSION}"
COPY entrypoint.sh .
COPY provision.py .
diff --git a/dev/docker-compose-integration.yml
b/dev/docker-compose-integration.yml
index 658bd69..fccdcdc 100644
--- a/dev/docker-compose-integration.yml
+++ b/dev/docker-compose-integration.yml
@@ -25,6 +25,7 @@ services:
iceberg_net:
depends_on:
- rest
+ - hive
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
@@ -37,6 +38,7 @@ services:
- 8080:8080
links:
- rest:rest
+ - hive:hive
- minio:minio
rest:
image: tabulario/iceberg-rest
@@ -85,5 +87,17 @@ services:
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
+ hive:
+ build: hive/
+ container_name: hive
+ hostname: hive
+ networks:
+ iceberg_net:
+ ports:
+ - 9083:9083
+ environment:
+ SERVICE_NAME: "metastore"
+ SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
+
networks:
iceberg_net:
diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile
new file mode 100644
index 0000000..ee63393
--- /dev/null
+++ b/dev/hive/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
-Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
-Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
diff --git a/dev/hive/core-site.xml b/dev/hive/core-site.xml
new file mode 100644
index 0000000..b77332b
--- /dev/null
+++ b/dev/hive/core-site.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>s3a://warehouse/hive</value>
+ </property>
+ <property>
+ <name>fs.s3a.impl</name>
+ <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+ </property>
+ <property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://minio:9000</value>
+ </property>
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>fs.s3a.connection.ssl.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+</configuration>
diff --git a/dev/provision.py b/dev/provision.py
index 9917cd3..e5048d2 100644
--- a/dev/provision.py
+++ b/dev/provision.py
@@ -24,318 +24,299 @@ from pyiceberg.types import FixedType, NestedField,
UUIDType
spark = SparkSession.builder.getOrCreate()
-spark.sql(
- """
- CREATE DATABASE IF NOT EXISTS default;
-"""
-)
-
-schema = Schema(
- NestedField(field_id=1, name="uuid_col", field_type=UUIDType(),
required=False),
- NestedField(field_id=2, name="fixed_col", field_type=FixedType(25),
required=False),
-)
-
-catalog = load_catalog(
- "local",
- **{
- "type": "rest",
- "uri": "http://rest:8181",
- "s3.endpoint": "http://minio:9000",
- "s3.access-key-id": "admin",
- "s3.secret-access-key": "password",
- },
-)
-
-catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned",
schema=schema)
-
-spark.sql(
- """
- INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
- ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345'
AS BINARY)),
- ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231'
AS BINARY)),
- ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345'
AS BINARY)),
- ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111'
AS BINARY)),
- ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111'
AS BINARY));
+catalogs = {
+ 'rest': load_catalog(
+ "rest",
+ **{
+ "type": "rest",
+ "uri": "http://rest:8181",
+ "s3.endpoint": "http://minio:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ ),
+ 'hive': load_catalog(
+ "hive",
+ **{
+ "type": "hive",
+ "uri": "http://hive:9083",
+ "s3.endpoint": "http://minio:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ ),
+}
+
+for catalog_name, catalog in catalogs.items():
+ spark.sql(
+ f"""
+ CREATE DATABASE IF NOT EXISTS {catalog_name}.default;
"""
-)
+ )
-spark.sql(
- """
- CREATE OR REPLACE TABLE default.test_null_nan
- USING iceberg
- AS SELECT
- 1 AS idx,
- float('NaN') AS col_numeric
-UNION ALL SELECT
- 2 AS idx,
- null AS col_numeric
-UNION ALL SELECT
- 3 AS idx,
- 1 AS col_numeric
-"""
-)
-
-spark.sql(
- """
- CREATE OR REPLACE TABLE default.test_null_nan_rewritten
- USING iceberg
- AS SELECT * FROM default.test_null_nan
-"""
-)
+ schema = Schema(
+ NestedField(field_id=1, name="uuid_col", field_type=UUIDType(),
required=False),
+ NestedField(field_id=2, name="fixed_col", field_type=FixedType(25),
required=False),
+ )
-spark.sql(
- """
-CREATE OR REPLACE TABLE default.test_limit as
- SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx;
-"""
-)
+
catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned",
schema=schema)
-spark.sql(
- """
-CREATE OR REPLACE TABLE default.test_positional_mor_deletes (
- dt date,
- number integer,
- letter string
-)
-USING iceberg
-TBLPROPERTIES (
- 'write.delete.mode'='merge-on-read',
- 'write.update.mode'='merge-on-read',
- 'write.merge.mode'='merge-on-read',
- 'format-version'='2'
-);
-"""
-)
-
-# Partitioning is not really needed, but there is a bug:
-# https://github.com/apache/iceberg/pull/7685
-spark.sql(
- """
- ALTER TABLE default.test_positional_mor_deletes ADD PARTITION FIELD
years(dt) AS dt_years
-"""
-)
+ spark.sql(
+ f"""
+ INSERT INTO {catalog_name}.default.test_uuid_and_fixed_unpartitioned
VALUES
+ ('102cb62f-e6f8-4eb0-9973-d9b012ff0967',
CAST('1234567890123456789012345' AS BINARY)),
+ ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226',
CAST('1231231231231231231231231' AS BINARY)),
+ ('639cccce-c9d2-494a-a78c-278ab234f024',
CAST('12345678901234567ass12345' AS BINARY)),
+ ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b',
CAST('asdasasdads12312312312111' AS BINARY)),
+ ('923dae77-83d6-47cd-b4b0-d383e64ee57e',
CAST('qweeqwwqq1231231231231111' AS BINARY));
+ """
+ )
-spark.sql(
- """
-INSERT INTO default.test_positional_mor_deletes
-VALUES
- (CAST('2023-03-01' AS date), 1, 'a'),
- (CAST('2023-03-02' AS date), 2, 'b'),
- (CAST('2023-03-03' AS date), 3, 'c'),
- (CAST('2023-03-04' AS date), 4, 'd'),
- (CAST('2023-03-05' AS date), 5, 'e'),
- (CAST('2023-03-06' AS date), 6, 'f'),
- (CAST('2023-03-07' AS date), 7, 'g'),
- (CAST('2023-03-08' AS date), 8, 'h'),
- (CAST('2023-03-09' AS date), 9, 'i'),
- (CAST('2023-03-10' AS date), 10, 'j'),
- (CAST('2023-03-11' AS date), 11, 'k'),
- (CAST('2023-03-12' AS date), 12, 'l');
-"""
-)
-
-spark.sql(
- """
-ALTER TABLE default.test_positional_mor_deletes CREATE TAG tag_12
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan
+ USING iceberg
+ AS SELECT
+ 1 AS idx,
+ float('NaN') AS col_numeric
+ UNION ALL SELECT
+ 2 AS idx,
+ null AS col_numeric
+ UNION ALL SELECT
+ 3 AS idx,
+ 1 AS col_numeric
"""
-)
+ )
-spark.sql(
- """
-ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH without_5
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan_rewritten
+ USING iceberg
+ AS SELECT * FROM default.test_null_nan
"""
-)
+ )
-spark.sql(
- """
-DELETE FROM default.test_positional_mor_deletes.branch_without_5 WHERE number
= 5
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE {catalog_name}.default.test_limit as
+ SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS
idx;
"""
-)
+ )
-spark.sql(
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_deletes
(
+ dt date,
+ number integer,
+ letter string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read',
+ 'format-version'='2'
+ );
"""
-DELETE FROM default.test_positional_mor_deletes WHERE number = 9
-"""
-)
+ )
-spark.sql(
- """
- CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes (
- dt date,
- number integer,
- letter string
- )
- USING iceberg
- TBLPROPERTIES (
- 'write.delete.mode'='merge-on-read',
- 'write.update.mode'='merge-on-read',
- 'write.merge.mode'='merge-on-read',
- 'format-version'='2'
- );
-"""
-)
-
-# Partitioning is not really needed, but there is a bug:
-# https://github.com/apache/iceberg/pull/7685
-spark.sql(
- """
- ALTER TABLE default.test_positional_mor_double_deletes ADD PARTITION FIELD
years(dt) AS dt_years
-"""
-)
+ # Partitioning is not really needed, but there is a bug:
+ # https://github.com/apache/iceberg/pull/7685
+ spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes
ADD PARTITION FIELD years(dt) AS dt_years")
-spark.sql(
- """
-INSERT INTO default.test_positional_mor_double_deletes
-VALUES
- (CAST('2023-03-01' AS date), 1, 'a'),
- (CAST('2023-03-02' AS date), 2, 'b'),
- (CAST('2023-03-03' AS date), 3, 'c'),
- (CAST('2023-03-04' AS date), 4, 'd'),
- (CAST('2023-03-05' AS date), 5, 'e'),
- (CAST('2023-03-06' AS date), 6, 'f'),
- (CAST('2023-03-07' AS date), 7, 'g'),
- (CAST('2023-03-08' AS date), 8, 'h'),
- (CAST('2023-03-09' AS date), 9, 'i'),
- (CAST('2023-03-10' AS date), 10, 'j'),
- (CAST('2023-03-11' AS date), 11, 'k'),
- (CAST('2023-03-12' AS date), 12, 'l');
-"""
-)
-
-spark.sql(
+ spark.sql(
+ f"""
+ INSERT INTO {catalog_name}.default.test_positional_mor_deletes
+ VALUES
+ (CAST('2023-03-01' AS date), 1, 'a'),
+ (CAST('2023-03-02' AS date), 2, 'b'),
+ (CAST('2023-03-03' AS date), 3, 'c'),
+ (CAST('2023-03-04' AS date), 4, 'd'),
+ (CAST('2023-03-05' AS date), 5, 'e'),
+ (CAST('2023-03-06' AS date), 6, 'f'),
+ (CAST('2023-03-07' AS date), 7, 'g'),
+ (CAST('2023-03-08' AS date), 8, 'h'),
+ (CAST('2023-03-09' AS date), 9, 'i'),
+ (CAST('2023-03-10' AS date), 10, 'j'),
+ (CAST('2023-03-11' AS date), 11, 'k'),
+ (CAST('2023-03-12' AS date), 12, 'l');
"""
- DELETE FROM default.test_positional_mor_double_deletes WHERE number = 9
-"""
-)
+ )
+
+ spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes
CREATE TAG tag_12")
+
+ spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes
CREATE BRANCH without_5")
+
+ spark.sql(f"DELETE FROM
{catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE
number = 5")
+
+ spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes
WHERE number = 9")
-spark.sql(
- """
- DELETE FROM default.test_positional_mor_double_deletes WHERE letter == 'f'
-"""
-)
-
-all_types_dataframe = (
- spark.range(0, 5, 1, 5)
- .withColumnRenamed("id", "longCol")
- .withColumn("intCol", expr("CAST(longCol AS INT)"))
- .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
- .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
- .withColumn("dateCol", date_add(current_date(), 1))
- .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
- .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
- .withColumn("booleanCol", expr("longCol > 5"))
- .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
- .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
- .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
- .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
- .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
- .withColumn("arrayCol", expr("ARRAY(longCol)"))
- .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
-)
-
-all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version",
"2").partitionedBy(
- "intCol"
-).createOrReplace()
-
-for table_name, partition in [
- ("test_partitioned_by_identity", "ts"),
- ("test_partitioned_by_years", "years(dt)"),
- ("test_partitioned_by_months", "months(dt)"),
- ("test_partitioned_by_days", "days(ts)"),
- ("test_partitioned_by_hours", "hours(ts)"),
- ("test_partitioned_by_truncate", "truncate(1, letter)"),
- ("test_partitioned_by_bucket", "bucket(16, number)"),
-]:
spark.sql(
f"""
- CREATE OR REPLACE TABLE default.{table_name} (
+ CREATE OR REPLACE TABLE
{catalog_name}.default.test_positional_mor_double_deletes (
dt date,
- ts timestamp,
number integer,
letter string
)
- USING iceberg;
+ USING iceberg
+ TBLPROPERTIES (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read',
+ 'format-version'='2'
+ );
"""
)
- spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD
{partition}")
+ # Partitioning is not really needed, but there is a bug:
+ # https://github.com/apache/iceberg/pull/7685
+ spark.sql(f"ALTER TABLE
{catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD
years(dt) AS dt_years")
spark.sql(
f"""
- INSERT INTO default.{table_name}
+ INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes
VALUES
- (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp),
1, 'a'),
- (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp),
2, 'b'),
- (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp),
3, 'c'),
- (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp),
4, 'd'),
- (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp),
5, 'e'),
- (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp),
6, 'f'),
- (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp),
7, 'g'),
- (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp),
8, 'h'),
- (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp),
9, 'i'),
- (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp),
10, 'j'),
- (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp),
11, 'k'),
- (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp),
12, 'l');
+ (CAST('2023-03-01' AS date), 1, 'a'),
+ (CAST('2023-03-02' AS date), 2, 'b'),
+ (CAST('2023-03-03' AS date), 3, 'c'),
+ (CAST('2023-03-04' AS date), 4, 'd'),
+ (CAST('2023-03-05' AS date), 5, 'e'),
+ (CAST('2023-03-06' AS date), 6, 'f'),
+ (CAST('2023-03-07' AS date), 7, 'g'),
+ (CAST('2023-03-08' AS date), 8, 'h'),
+ (CAST('2023-03-09' AS date), 9, 'i'),
+ (CAST('2023-03-10' AS date), 10, 'j'),
+ (CAST('2023-03-11' AS date), 11, 'k'),
+ (CAST('2023-03-12' AS date), 12, 'l');
"""
)
-# There is an issue with CREATE OR REPLACE
-# https://github.com/apache/iceberg/issues/8756
-spark.sql(
+ spark.sql(f"DELETE FROM
{catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9")
+
+ spark.sql(f"DELETE FROM
{catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'")
+
+ all_types_dataframe = (
+ spark.range(0, 5, 1, 5)
+ .withColumnRenamed("id", "longCol")
+ .withColumn("intCol", expr("CAST(longCol AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("dateCol", date_add(current_date(), 1))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+ .withColumn("booleanCol", expr("longCol > 5"))
+ .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
+ .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
+ .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
+ .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
+ .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
+ .withColumn("arrayCol", expr("ARRAY(longCol)"))
+ .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
+ )
+
+
all_types_dataframe.writeTo(f"{catalog_name}.default.test_all_types").tableProperty("format-version",
"2").partitionedBy(
+ "intCol"
+ ).createOrReplace()
+
+ for table_name, partition in [
+ ("test_partitioned_by_identity", "ts"),
+ ("test_partitioned_by_years", "years(dt)"),
+ ("test_partitioned_by_months", "months(dt)"),
+ ("test_partitioned_by_days", "days(ts)"),
+ ("test_partitioned_by_hours", "hours(ts)"),
+ ("test_partitioned_by_truncate", "truncate(1, letter)"),
+ ("test_partitioned_by_bucket", "bucket(16, number)"),
+ ]:
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE {catalog_name}.default.{table_name} (
+ dt date,
+ ts timestamp,
+ number integer,
+ letter string
+ )
+ USING iceberg;
+ """
+ )
+
+ spark.sql(f"ALTER TABLE {catalog_name}.default.{table_name} ADD
PARTITION FIELD {partition}")
+
+ spark.sql(
+ f"""
+ INSERT INTO {catalog_name}.default.{table_name}
+ VALUES
+ (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS
timestamp), 1, 'a'),
+ (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS
timestamp), 2, 'b'),
+ (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS
timestamp), 3, 'c'),
+ (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS
timestamp), 4, 'd'),
+ (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS
timestamp), 5, 'e'),
+ (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS
timestamp), 6, 'f'),
+ (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS
timestamp), 7, 'g'),
+ (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS
timestamp), 8, 'h'),
+ (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS
timestamp), 9, 'i'),
+ (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS
timestamp), 10, 'j'),
+ (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS
timestamp), 11, 'k'),
+ (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS
timestamp), 12, 'l');
+ """
+ )
+
+ # There is an issue with CREATE OR REPLACE
+ # https://github.com/apache/iceberg/issues/8756
+ spark.sql(f"DROP TABLE IF EXISTS
{catalog_name}.default.test_table_version")
+
+ spark.sql(
+ f"""
+ CREATE TABLE {catalog_name}.default.test_table_version (
+ dt date,
+ number integer,
+ letter string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='1'
+ );
"""
-DROP TABLE IF EXISTS default.test_table_version
-"""
-)
+ )
-spark.sql(
+ spark.sql(
+ f"""
+ CREATE TABLE {catalog_name}.default.test_table_sanitized_character (
+ `letter/abc` string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='1'
+ );
"""
-CREATE TABLE default.test_table_version (
- dt date,
- number integer,
- letter string
-)
-USING iceberg
-TBLPROPERTIES (
- 'format-version'='1'
-);
-"""
-)
-
-spark.sql(
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {catalog_name}.default.test_table_sanitized_character
+ VALUES
+ ('123')
"""
-CREATE TABLE default.test_table_sanitized_character (
- `letter/abc` string
-)
-USING iceberg
-TBLPROPERTIES (
- 'format-version'='1'
-);
-"""
-)
-
-spark.sql(
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {catalog_name}.default.test_table_sanitized_character
+ VALUES
+ ('123')
"""
-INSERT INTO default.test_table_sanitized_character
-VALUES
- ('123')
-"""
-)
+ )
-spark.sql(
+ spark.sql(
+ f"""
+ CREATE TABLE {catalog_name}.default.test_table_add_column (
+ a string
+ )
+ USING iceberg
"""
-CREATE TABLE default.test_table_add_column (
- a string
-)
-USING iceberg
-"""
-)
+ )
-spark.sql("INSERT INTO default.test_table_add_column VALUES ('1')")
+ spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column
VALUES ('1')")
-spark.sql(
- """
-ALTER TABLE default.test_table_add_column ADD COLUMN b string
-"""
-)
+ spark.sql(f"ALTER TABLE {catalog_name}.default.test_table_add_column ADD
COLUMN b string")
-spark.sql("INSERT INTO default.test_table_add_column VALUES ('2', '2')")
+ spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column
VALUES ('2', '2')")
diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf
index 56c3454..2316336 100644
--- a/dev/spark-defaults.conf
+++ b/dev/spark-defaults.conf
@@ -16,13 +16,19 @@
#
spark.sql.extensions
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
-spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
-spark.sql.catalog.demo.type rest
-spark.sql.catalog.demo.uri http://rest:8181
-spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
-spark.sql.catalog.demo.warehouse s3://warehouse/wh/
-spark.sql.catalog.demo.s3.endpoint http://minio:9000
-spark.sql.defaultCatalog demo
+spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.rest.type rest
+spark.sql.catalog.rest.uri http://rest:8181
+spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO
+spark.sql.catalog.rest.warehouse s3://warehouse/rest/
+spark.sql.catalog.rest.s3.endpoint http://minio:9000
+spark.sql.catalog.hive org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.hive.type hive
+spark.sql.catalog.hive.uri http://hive:9083
+spark.sql.catalog.hive.io-impl org.apache.iceberg.aws.s3.S3FileIO
+spark.sql.catalog.hive.warehouse s3://warehouse/hive/
+spark.sql.catalog.hive.s3.endpoint http://minio:9000
+spark.sql.defaultCatalog rest
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/integration/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/test_integration.py b/tests/integration/test_catalogs.py
similarity index 70%
rename from tests/test_integration.py
rename to tests/integration/test_catalogs.py
index 2a173be..3fbdb69 100644
--- a/tests/test_integration.py
+++ b/tests/integration/test_catalogs.py
@@ -25,6 +25,7 @@ import pytest
from pyarrow.fs import S3FileSystem
from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import (
And,
@@ -50,7 +51,7 @@ DEFAULT_PROPERTIES = {'write.parquet.compression-codec':
'zstd'}
@pytest.fixture()
-def catalog() -> Catalog:
+def catalog_rest() -> Catalog:
return load_catalog(
"local",
**{
@@ -64,40 +65,23 @@ def catalog() -> Catalog:
@pytest.fixture()
-def table_test_null_nan(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_null_nan")
-
-
[email protected]()
-def table_test_null_nan_rewritten(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_null_nan_rewritten")
-
-
[email protected]()
-def table_test_limit(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_limit")
-
-
[email protected]()
-def table_test_all_types(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_all_types")
-
-
[email protected]()
-def table_test_table_version(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_table_version")
-
-
[email protected]()
-def table_test_table_sanitized_character(catalog: Catalog) -> Table:
- return catalog.load_table("default.test_table_sanitized_character")
+def catalog_hive() -> Catalog:
+ return load_catalog(
+ "local",
+ **{
+ "type": "hive",
+ "uri": "http://localhost:9083",
+ "s3.endpoint": "http://localhost:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ )
TABLE_NAME = ("default", "t1")
[email protected]()
-def table(catalog: Catalog) -> Table:
+def create_table(catalog: Catalog) -> Table:
try:
catalog.drop_table(TABLE_NAME)
except NoSuchTableError:
@@ -115,7 +99,12 @@ def table(catalog: Catalog) -> Table:
@pytest.mark.integration
-def test_table_properties(table: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_table_properties(catalog: Catalog) -> None:
+ if isinstance(catalog, HiveCatalog):
+ pytest.skip("Not yet implemented:
https://github.com/apache/iceberg-python/issues/275")
+ table = create_table(catalog)
+
assert table.properties == DEFAULT_PROPERTIES
with table.transaction() as transaction:
@@ -137,26 +126,10 @@ def test_table_properties(table: Table) -> None:
assert table.properties == DEFAULT_PROPERTIES
[email protected]()
-def test_positional_mor_deletes(catalog: Catalog) -> Table:
- """Table that has positional deletes"""
- return catalog.load_table("default.test_positional_mor_deletes")
-
-
[email protected]()
-def test_table_add_column(catalog: Catalog) -> Table:
- """Table that has a new column"""
- return catalog.load_table("default.test_table_add_column")
-
-
[email protected]()
-def test_positional_mor_double_deletes(catalog: Catalog) -> Table:
- """Table that has multiple positional deletes"""
- return catalog.load_table("default.test_positional_mor_double_deletes")
-
-
@pytest.mark.integration
-def test_pyarrow_nan(table_test_null_nan: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_nan(catalog: Catalog) -> None:
+ table_test_null_nan = catalog.load_table("default.test_null_nan")
arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"),
selected_fields=("idx", "col_numeric")).to_arrow()
assert len(arrow_table) == 1
assert arrow_table["idx"][0].as_py() == 1
@@ -164,7 +137,9 @@ def test_pyarrow_nan(table_test_null_nan: Table) -> None:
@pytest.mark.integration
-def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_nan_rewritten(catalog: Catalog) -> None:
+ table_test_null_nan_rewritten =
catalog.load_table("default.test_null_nan_rewritten")
arrow_table = table_test_null_nan_rewritten.scan(
row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")
).to_arrow()
@@ -174,14 +149,18 @@ def
test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
@pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
@pytest.mark.skip(reason="Fixing issues with NaN's:
https://github.com/apache/arrow/issues/34162")
-def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None:
+def test_pyarrow_not_nan_count(catalog: Catalog) -> None:
+ table_test_null_nan = catalog.load_table("default.test_null_nan")
not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"),
selected_fields=("idx",)).to_arrow()
assert len(not_nan) == 2
@pytest.mark.integration
-def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_duckdb_nan(catalog: Catalog) -> None:
+ table_test_null_nan_rewritten =
catalog.load_table("default.test_null_nan_rewritten")
con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan")
result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE
isnan(col_numeric)").fetchone()
assert result[0] == 1
@@ -189,7 +168,9 @@ def test_duckdb_nan(table_test_null_nan_rewritten: Table)
-> None:
@pytest.mark.integration
-def test_pyarrow_limit(table_test_limit: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_limit(catalog: Catalog) -> None:
+ table_test_limit = catalog.load_table("default.test_limit")
limited_result = table_test_limit.scan(selected_fields=("idx",),
limit=1).to_arrow()
assert len(limited_result) == 1
@@ -200,16 +181,20 @@ def test_pyarrow_limit(table_test_limit: Table) -> None:
assert len(full_result) == 10
[email protected]("ignore")
@pytest.mark.integration
-def test_ray_nan(table_test_null_nan_rewritten: Table) -> None:
[email protected]("ignore")
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_ray_nan(catalog: Catalog) -> None:
+ table_test_null_nan_rewritten =
catalog.load_table("default.test_null_nan_rewritten")
ray_dataset = table_test_null_nan_rewritten.scan().to_ray()
assert ray_dataset.count() == 3
assert math.isnan(ray_dataset.take()[0]["col_numeric"])
@pytest.mark.integration
-def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_ray_nan_rewritten(catalog: Catalog) -> None:
+ table_test_null_nan_rewritten =
catalog.load_table("default.test_null_nan_rewritten")
ray_dataset = table_test_null_nan_rewritten.scan(
row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")
).to_ray()
@@ -219,15 +204,19 @@ def test_ray_nan_rewritten(table_test_null_nan_rewritten:
Table) -> None:
@pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
@pytest.mark.skip(reason="Fixing issues with NaN's:
https://github.com/apache/arrow/issues/34162")
-def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None:
+def test_ray_not_nan_count(catalog: Catalog) -> None:
+ table_test_null_nan_rewritten =
catalog.load_table("default.test_null_nan_rewritten")
ray_dataset =
table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"),
selected_fields=("idx",)).to_ray()
print(ray_dataset.take())
assert ray_dataset.count() == 2
@pytest.mark.integration
-def test_ray_all_types(table_test_all_types: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_ray_all_types(catalog: Catalog) -> None:
+ table_test_all_types = catalog.load_table("default.test_all_types")
ray_dataset = table_test_all_types.scan().to_ray()
pandas_dataframe = table_test_all_types.scan().to_pandas()
assert ray_dataset.count() == pandas_dataframe.shape[0]
@@ -235,7 +224,9 @@ def test_ray_all_types(table_test_all_types: Table) -> None:
@pytest.mark.integration
-def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
+ table_test_all_types = catalog.load_table("default.test_all_types")
fs = S3FileSystem(endpoint_override="http://localhost:9000",
access_key="admin", secret_key="password")
data_file_paths = [task.file.file_path for task in
table_test_all_types.scan().plan_files()]
for data_file_path in data_file_paths:
@@ -248,7 +239,8 @@ def test_pyarrow_to_iceberg_all_types(table_test_all_types:
Table) -> None:
@pytest.mark.integration
-def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_deletes(catalog: Catalog) -> None:
# number, letter
# (1, 'a'),
# (2, 'b'),
@@ -262,6 +254,7 @@ def test_pyarrow_deletes(test_positional_mor_deletes:
Table) -> None:
# (10, 'j'),
# (11, 'k'),
# (12, 'l')
+ test_positional_mor_deletes =
catalog.load_table("default.test_positional_mor_deletes")
arrow_table = test_positional_mor_deletes.scan().to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10,
11, 12]
@@ -283,7 +276,8 @@ def test_pyarrow_deletes(test_positional_mor_deletes:
Table) -> None:
@pytest.mark.integration
-def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) ->
None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_pyarrow_deletes_double(catalog: Catalog) -> None:
# number, letter
# (1, 'a'),
# (2, 'b'),
@@ -297,6 +291,7 @@ def
test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No
# (10, 'j'),
# (11, 'k'),
# (12, 'l')
+ test_positional_mor_double_deletes =
catalog.load_table("default.test_positional_mor_double_deletes")
arrow_table = test_positional_mor_double_deletes.scan().to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11,
12]
@@ -318,6 +313,7 @@ def
test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No
@pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
def test_partitioned_tables(catalog: Catalog) -> None:
for table_name, predicate in [
("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"),
@@ -334,6 +330,7 @@ def test_partitioned_tables(catalog: Catalog) -> None:
@pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
unpartitioned_uuid =
catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col ==
'102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow()
@@ -350,6 +347,7 @@ def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
@pytest.mark.integration
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
def test_unpartitioned_fixed_table(catalog: Catalog) -> None:
fixed_table =
catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col",
b"1234567890123456789012345")).to_arrow()
@@ -368,19 +366,25 @@ def test_unpartitioned_fixed_table(catalog: Catalog) ->
None:
@pytest.mark.integration
-def test_scan_tag(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_scan_tag(catalog: Catalog) -> None:
+ test_positional_mor_deletes =
catalog.load_table("default.test_positional_mor_deletes")
arrow_table =
test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12]
@pytest.mark.integration
-def test_scan_branch(test_positional_mor_deletes: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_scan_branch(catalog: Catalog) -> None:
+ test_positional_mor_deletes =
catalog.load_table("default.test_positional_mor_deletes")
arrow_table =
test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10,
11, 12]
@pytest.mark.integration
-def test_filter_on_new_column(test_table_add_column: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_filter_on_new_column(catalog: Catalog) -> None:
+ test_table_add_column = catalog.load_table("default.test_table_add_column")
arrow_table = test_table_add_column.scan(row_filter="b == '2'").to_arrow()
assert arrow_table["b"].to_pylist() == ['2']
@@ -392,7 +396,12 @@ def test_filter_on_new_column(test_table_add_column:
Table) -> None:
@pytest.mark.integration
-def test_upgrade_table_version(table_test_table_version: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_upgrade_table_version(catalog: Catalog) -> None:
+ if isinstance(catalog, HiveCatalog):
+ pytest.skip("Not yet implemented:
https://github.com/apache/iceberg-python/issues/274")
+ table_test_table_version = catalog.load_table("default.test_table_version")
+
assert table_test_table_version.format_version == 1
with table_test_table_version.transaction() as transaction:
@@ -417,7 +426,9 @@ def test_upgrade_table_version(table_test_table_version:
Table) -> None:
@pytest.mark.integration
-def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None:
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'),
pytest.lazy_fixture('catalog_rest')])
+def test_sanitize_character(catalog: Catalog) -> None:
+ table_test_table_sanitized_character =
catalog.load_table("default.test_table_sanitized_character")
arrow_table = table_test_table_sanitized_character.scan().to_arrow()
assert len(arrow_table.schema.names), 1
assert len(table_test_table_sanitized_character.schema().fields), 1
diff --git a/tests/test_integration_manifest.py
b/tests/integration/test_rest_manifest.py
similarity index 100%
rename from tests/test_integration_manifest.py
rename to tests/integration/test_rest_manifest.py
diff --git a/tests/test_integration_schema.py
b/tests/integration/test_rest_schema.py
similarity index 100%
rename from tests/test_integration_schema.py
rename to tests/integration/test_rest_schema.py