This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 05a3bd42c4 [python] Introduce lance file format support to pypaimon
(#6746)
05a3bd42c4 is described below
commit 05a3bd42c4c25eea43fb8a8129311aeca3ee3d3f
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Dec 5 09:19:08 2025 +0800
[python] Introduce lance file format support to pypaimon (#6746)
---
.github/workflows/paimon-python-checks.yml | 2 +-
docs/content/concepts/spec/fileformat.md | 88 ++++++++-
paimon-python/dev/requirements.txt | 4 +-
paimon-python/pypaimon/common/core_options.py | 1 +
paimon-python/pypaimon/common/file_io.py | 18 ++
.../pypaimon/read/reader/format_lance_reader.py | 72 +++++++
paimon-python/pypaimon/read/reader/lance_utils.py | 58 ++++++
paimon-python/pypaimon/read/split_read.py | 4 +
.../sample/rest_catalog_ray_lance_sample.py | 210 +++++++++++++++++++++
.../pypaimon/tests/reader_append_only_test.py | 31 +++
.../pypaimon/tests/reader_primary_key_test.py | 55 ++++++
.../pypaimon/tests/rest/rest_base_test.py | 36 ++++
.../rest/rest_catalog_commit_snapshot_test.py | 74 ++++++++
.../pypaimon/tests/rest/rest_read_write_test.py | 97 ++++++++++
.../pypaimon/write/writer/data_blob_writer.py | 2 +
paimon-python/pypaimon/write/writer/data_writer.py | 2 +
16 files changed, 751 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 80277e701c..e6a10e8949 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -94,7 +94,7 @@ jobs:
python -m pip install -q readerwriterlock==1.0.9
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1
2>&1 >/dev/null
else
python -m pip install --upgrade pip
- python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0 2>&1
>/dev/null
+ python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests
parameterized==0.9.0 2>&1 >/dev/null
fi
- name: Run lint-python.sh
shell: bash
diff --git a/docs/content/concepts/spec/fileformat.md
b/docs/content/concepts/spec/fileformat.md
index 5c69b494cc..4a5d2a4d7c 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -26,10 +26,11 @@ under the License.
# File Format
-Currently, supports Parquet, Avro, ORC, CSV, JSON file formats.
+Currently, supports Parquet, Avro, ORC, CSV, JSON, and Lance file formats.
- Recommended column format is Parquet, which has a high compression rate and
fast column projection queries.
- Recommended row based format is Avro, which has good performance n reading
and writing full row (all columns).
- Recommended testing format is CSV, which has better readability but the
worst read-write performance.
+- Recommended format for ML workloads is Lance, which is optimized for vector
search and machine learning use cases.
## PARQUET
@@ -640,3 +641,88 @@ The following table lists the type mapping from Paimon
type to JSON type.
</tr>
</tbody>
</table>
+
+## LANCE
+
+Lance is a modern columnar data format optimized for machine learning and
vector search workloads. It provides high-performance read and write operations
with native support for Apache Arrow.
+
+The following table lists the type mapping from Paimon type to Lance (Arrow)
type.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Paimon Type</th>
+ <th class="text-center">Lance (Arrow) type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>CHAR / VARCHAR / STRING</td>
+ <td>UTF8</td>
+ </tr>
+ <tr>
+ <td>BOOLEAN</td>
+ <td>BOOL</td>
+ </tr>
+ <tr>
+ <td>BINARY / VARBINARY</td>
+ <td>BINARY</td>
+ </tr>
+ <tr>
+ <td>DECIMAL(P, S)</td>
+ <td>DECIMAL128(P, S)</td>
+ </tr>
+ <tr>
+ <td>TINYINT</td>
+ <td>INT8</td>
+ </tr>
+ <tr>
+ <td>SMALLINT</td>
+ <td>INT16</td>
+ </tr>
+ <tr>
+ <td>INT</td>
+ <td>INT32</td>
+ </tr>
+ <tr>
+ <td>BIGINT</td>
+ <td>INT64</td>
+ </tr>
+ <tr>
+ <td>FLOAT</td>
+ <td>FLOAT</td>
+ </tr>
+ <tr>
+ <td>DOUBLE</td>
+ <td>DOUBLE</td>
+ </tr>
+ <tr>
+ <td>DATE</td>
+ <td>DATE32</td>
+ </tr>
+ <tr>
+ <td>TIME</td>
+ <td>TIME32 / TIME64</td>
+ </tr>
+ <tr>
+ <td>TIMESTAMP(P)</td>
+ <td>TIMESTAMP (unit based on precision)</td>
+ </tr>
+ <tr>
+ <td>ARRAY</td>
+ <td>LIST</td>
+ </tr>
+ <tr>
+ <td>MULTISET</td>
+ <td>LIST</td>
+ </tr>
+ <tr>
+ <td>ROW</td>
+ <td>STRUCT</td>
+ </tr>
+ </tbody>
+</table>
+
+Limitations:
+1. Lance file format does not support `MAP` type.
+2. Lance file format does not support `TIMESTAMP_LOCAL_ZONE` type.
diff --git a/paimon-python/dev/requirements.txt
b/paimon-python/dev/requirements.txt
index a08a0ba44a..342d7a40f4 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -38,4 +38,6 @@ pyarrow==16; python_version >= "3.8"
ray==2.48.0
readerwriterlock==1.0.9
zstandard==0.19.0; python_version<"3.9"
-zstandard==0.24.0; python_version>="3.9"
\ No newline at end of file
+zstandard==0.24.0; python_version>="3.9"
+pylance==0.39.0; python_version>="3.9"
+pylance==0.10.18; python_version>="3.8" and python_version<"3.9"
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 0686132979..0e3fa1bacb 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -43,6 +43,7 @@ class CoreOptions(str, Enum):
FILE_FORMAT_AVRO = "avro"
FILE_FORMAT_PARQUET = "parquet"
FILE_FORMAT_BLOB = "blob"
+ FILE_FORMAT_LANCE = "lance"
FILE_COMPRESSION = "file.compression"
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 15e1d8d8d9..1220074164 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -383,6 +383,24 @@ class FileIO:
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
+ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ import lance
+ from pypaimon.read.reader.lance_utils import to_lance_specified
+ file_path_for_lance, storage_options = to_lance_specified(self,
path)
+
+ writer = lance.file.LanceFileWriter(
+ file_path_for_lance, data.schema,
storage_options=storage_options, **kwargs)
+ try:
+ # Write all batches
+ for batch in data.to_batches():
+ writer.write_batch(batch)
+ finally:
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
+
def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
try:
# Validate input constraints
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
new file mode 100644
index 0000000000..a6b3277167
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -0,0 +1,72 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional, Any
+
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.lance_utils import to_lance_specified
+
+
+class FormatLanceReader(RecordBatchReader):
+ """
+ A Format Reader that reads record batch from a Lance file using PyArrow,
+ and filters it based on the provided predicate and projection.
+ """
+
+ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+ push_down_predicate: Any, batch_size: int = 4096):
+ """Initialize Lance reader."""
+ import lance
+
+ file_path_for_lance, storage_options = to_lance_specified(file_io,
file_path)
+
+ columns_for_lance = read_fields if read_fields else None
+ lance_reader = lance.file.LanceFileReader(
+ file_path_for_lance,
+ storage_options=storage_options,
+ columns=columns_for_lance)
+ reader_results = lance_reader.read_all()
+
+ # Convert to PyArrow table
+ pa_table = reader_results.to_table()
+
+ if push_down_predicate is not None:
+ in_memory_dataset = ds.InMemoryDataset(pa_table)
+ scanner = in_memory_dataset.scanner(filter=push_down_predicate,
batch_size=batch_size)
+ self.reader = scanner.to_reader()
+ else:
+ self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ try:
+ # Handle both scanner reader and iterator
+ if hasattr(self.reader, 'read_next_batch'):
+ return self.reader.read_next_batch()
+ else:
+ # Iterator of batches
+ return next(self.reader)
+ except StopIteration:
+ return None
+
+ def close(self):
+ if self.reader is not None:
+ self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py
b/paimon-python/pypaimon/read/reader/lance_utils.py
new file mode 100644
index 0000000000..c7d768e631
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/lance_utils.py
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+from typing import Dict, Optional, Tuple
+
+from pypaimon.common.config import OssOptions
+from pypaimon.common.file_io import FileIO
+
+
+def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str,
Optional[Dict[str, str]]]:
+ """Convert path and extract storage options for Lance format."""
+ scheme, _, _ = file_io.parse_location(file_path)
+ storage_options = None
+ file_path_for_lance = file_io.to_filesystem_path(file_path)
+
+ if scheme in {'file', None} or not scheme:
+ if not os.path.isabs(file_path_for_lance):
+ file_path_for_lance = os.path.abspath(file_path_for_lance)
+ else:
+ file_path_for_lance = file_path
+
+ if scheme == 'oss':
+ storage_options = {}
+ if hasattr(file_io, 'properties'):
+ endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT)
+ if endpoint:
+ if not endpoint.startswith('http://') and not
endpoint.startswith('https://'):
+ storage_options['endpoint'] = f"https://{endpoint}"
+ else:
+ storage_options['endpoint'] = endpoint
+
+ if OssOptions.OSS_ACCESS_KEY_ID in file_io.properties:
+ storage_options['access_key_id'] =
file_io.properties[OssOptions.OSS_ACCESS_KEY_ID]
+ if OssOptions.OSS_ACCESS_KEY_SECRET in file_io.properties:
+ storage_options['secret_access_key'] =
file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET]
+ if OssOptions.OSS_SECURITY_TOKEN in file_io.properties:
+ storage_options['session_token'] =
file_io.properties[OssOptions.OSS_SECURITY_TOKEN]
+ storage_options['virtual_hosted_style_request'] = 'true'
+
+ file_path_for_lance = file_path.replace('oss://', 's3://')
+
+ return file_path_for_lance, storage_options
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 92152db7ee..94d832eff4 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -37,6 +37,7 @@ from pypaimon.read.reader.empty_record_reader import
EmptyFileRecordReader
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -101,6 +102,9 @@ class SplitRead(ABC):
blob_as_descriptor =
CoreOptions.blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
self.read_fields,
read_arrow_predicate, blob_as_descriptor)
+ elif file_format == CoreOptions.FILE_FORMAT_LANCE:
+ format_reader = FormatLanceReader(self.table.file_io, file_path,
read_file_fields,
+ read_arrow_predicate)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
read_file_fields,
read_arrow_predicate)
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py
b/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py
new file mode 100644
index 0000000000..643edec843
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py
@@ -0,0 +1,210 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Example: REST Catalog + Ray Data + Lance Format Integration
+
+Demonstrates:
+1. REST catalog with Lance file format
+2. Writing data to Paimon table using Lance format
+3. Reading data using Ray Data for distributed processing
+4. Integration of REST catalog, Ray Data, and Lance format
+"""
+
+import tempfile
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+import ray
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+
+
+def main():
+ """REST catalog + Ray Data + Lance format integration example."""
+
+ # Initialize Ray
+ ray.init(ignore_reinit_error=True)
+ print("Ray initialized successfully")
+
+ # Setup mock REST server
+ temp_dir = tempfile.mkdtemp()
+ token = str(uuid.uuid4())
+ server = RESTCatalogServer(
+ data_path=temp_dir,
+ auth_provider=BearTokenAuthProvider(token),
+ config=ConfigResponse(defaults={"prefix": "mock-test"}),
+ warehouse="warehouse"
+ )
+ server.start()
+ print(f"REST server started at: {server.get_url()}")
+
+ try:
+ # Create REST catalog
+ catalog = CatalogFactory.create({
+ 'metastore': 'rest',
+ 'uri': f"http://localhost:{server.port}",
+ 'warehouse': "warehouse", # Must match server's warehouse
parameter
+ 'token.provider': 'bear',
+ 'token': token,
+ })
+ catalog.create_database("default", True)
+
+ # Create table schema with Lance format
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('value', pa.float64()),
+ ('category', pa.string()),
+ ])
+
+ # Create schema with Lance file format option
+ schema = Schema.from_pyarrow_schema(
+ pa_schema=pa_schema,
+ primary_keys=['id'],
+ options={
+ 'bucket': '2',
+ CoreOptions.FILE_FORMAT: CoreOptions.FILE_FORMAT_LANCE,
+ }
+ )
+
+ table_name = 'default.ray_lance_example'
+ catalog.create_table(table_name, schema, False)
+ table = catalog.get_table(table_name)
+
+ table_path = table.table_path
+ print(f"\nTable path: {table_path}")
+ print(f"File format: {CoreOptions.FILE_FORMAT_LANCE}")
+
+ # Write data using Lance format
+ print("\nWriting data with Lance format...")
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ # Write multiple batches to demonstrate Lance format
+ data1 = pd.DataFrame({
+ 'id': [1, 2, 3, 4, 5],
+ 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+ 'value': [10.5, 20.3, 30.7, 40.1, 50.9],
+ 'category': ['A', 'B', 'A', 'B', 'A'],
+ })
+ table_write.write_pandas(data1)
+
+ data2 = pd.DataFrame({
+ 'id': [6, 7, 8],
+ 'name': ['Frank', 'Grace', 'Henry'],
+ 'value': [60.2, 70.4, 80.6],
+ 'category': ['B', 'A', 'B'],
+ })
+ table_write.write_pandas(data2)
+
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ print("Data written successfully with Lance format!")
+
+ # Read data using standard methods
+ print("\nReading data using standard methods...")
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+
+ # Read to Pandas
+ result_pandas = table_read.to_pandas(splits)
+ print("\nPandas DataFrame result:")
+ print(result_pandas)
+
+ # Read to Arrow
+ result_arrow = table_read.to_arrow(splits)
+ print(f"\nArrow Table result: {result_arrow.num_rows} rows")
+
+ # Read using Ray Data
+ print("\nReading data using Ray Data for distributed processing...")
+ ray_dataset = table_read.to_ray(splits)
+
+ print(f"Ray Dataset: {ray_dataset}")
+ print(f"Number of rows: {ray_dataset.count()}")
+
+ # Sample Ray Data operations
+ print("\nRay Data operations:")
+
+ # Take first few rows
+ sample_data = ray_dataset.take(3)
+ print(f"First 3 rows: {sample_data}")
+
+ # Filter data
+ filtered_dataset = ray_dataset.filter(lambda row: row['value'] > 30.0)
+ print(f"Filtered rows (value > 30): {filtered_dataset.count()}")
+
+ # Map operation
+ def double_value(row):
+ row['value'] = row['value'] * 2
+ return row
+
+ mapped_dataset = ray_dataset.map(double_value)
+ print(f"Mapped dataset (doubled values): {mapped_dataset.count()}
rows")
+
+ # Convert to Pandas
+ ray_pandas = ray_dataset.to_pandas()
+ print("\nRay Dataset converted to Pandas:")
+ print(ray_pandas)
+
+ # Group by category
+ print("\nGrouping by category using Ray Data:")
+ grouped = ray_dataset.groupby("category").sum("value")
+ print(grouped)
+
+ # Demonstrate predicate pushdown with Lance format
+ print("\nDemonstrating predicate pushdown with Lance format...")
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.greater_than('value', 30.0)
+ read_builder_filtered = read_builder.with_filter(predicate)
+
+ table_scan_filtered = read_builder_filtered.new_scan()
+ table_read_filtered = read_builder_filtered.new_read()
+ splits_filtered = table_scan_filtered.plan().splits()
+
+ result_filtered = table_read_filtered.to_pandas(splits_filtered)
+ print("\nFiltered result (value > 30):")
+ print(result_filtered)
+
+ # Demonstrate projection with Lance format
+ print("\nDemonstrating column projection with Lance format...")
+ read_builder_projected = read_builder.with_projection(['id', 'name',
'value'])
+ table_scan_projected = read_builder_projected.new_scan()
+ table_read_projected = read_builder_projected.new_read()
+ splits_projected = table_scan_projected.plan().splits()
+
+ result_projected = table_read_projected.to_pandas(splits_projected)
+ print("\nProjected result (id, name, value only):")
+ print(result_projected)
+
+ finally:
+ server.shutdown()
+ ray.shutdown()
+ print("\nServer stopped and Ray shutdown")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 3a99196854..90d0924ae8 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -83,6 +83,37 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_lance_ao_reader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
+ self.catalog.create_table('default.test_append_only_lance', schema,
False)
+ table = self.catalog.get_table('default.test_append_only_lance')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
+ def test_lance_ao_reader_with_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
+ self.catalog.create_table('default.test_append_only_lance_filter',
schema, False)
+ table = self.catalog.get_table('default.test_append_only_lance_filter')
+ self._write_test_table(table)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.less_than('user_id', 7)
+ p2 = predicate_builder.greater_or_equal('user_id', 2)
+ p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d,
5/e, 6/f] left
+ p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d,
6/f] left
+ p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c
+ p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
+ g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ self.expected.slice(5, 1) # 6/f
+ ])
+ self.assertEqual(actual.sort_by('user_id'), expected)
+
def test_append_only_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_multi_once_commit', schema,
False)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index bcbc94bd68..be35844c70 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -106,6 +106,61 @@ class PkReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_pk_lance_reader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={
+ 'bucket': '2',
+ 'file.format': 'lance'
+ })
+ self.catalog.create_table('default.test_pk_lance', schema, False)
+ table = self.catalog.get_table('default.test_pk_lance')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+
+ for split in splits:
+ for file in split.files:
+ file_path = file.file_path
+ table_path = os.path.join(self.warehouse, 'default.db',
'test_pk_lance')
+ full_path = os.path.join(table_path, file_path)
+ if os.path.exists(full_path):
+ self.assertTrue(os.path.exists(full_path))
+ self.assertTrue(
+ file_path.endswith('.lance'),
+ f"Expected file path to end with .lance, got
{file_path}")
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
+ def test_pk_lance_reader_with_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={
+ 'bucket': '2',
+ 'file.format': 'lance'
+ })
+ self.catalog.create_table('default.test_pk_lance_filter', schema,
False)
+ table = self.catalog.get_table('default.test_pk_lance_filter')
+ self._write_test_table(table)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.is_in('dt', ['p1'])
+ p2 = predicate_builder.between('user_id', 2, 7)
+ p3 = predicate_builder.is_not_null('behavior')
+ g1 = predicate_builder.and_predicates([p1, p2, p3])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ expected = pa.concat_tables([
+ self.expected.slice(1, 1), # 2/b
+ self.expected.slice(5, 1) # 7/g
+ ])
+ self.assertEqual(actual, expected)
+
def test_pk_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py
b/paimon-python/pypaimon/tests/rest/rest_base_test.py
index e45fac5cde..7f3e44afcb 100644
--- a/paimon-python/pypaimon/tests/rest/rest_base_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py
@@ -227,3 +227,39 @@ class RESTBaseTest(unittest.TestCase):
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
return table_read.to_arrow(splits)
+
+ def _write_test_table_with_schema(self, table, pa_schema):
+ """Write test data using the specified PyArrow schema."""
+ write_builder = table.new_batch_write_builder()
+
+ # first write
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10',
'2024-01-01'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # second write
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk',
'2025-08-08'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
diff --git
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index 84c6557c1e..baec93fad1 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -216,6 +216,80 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
# Verify client was called correctly
mock_client.post_with_response_type.assert_called_once()
+ def test_rest_catalog_commit_snapshot_with_lance_format(self):
+ """Test snapshot commit with Lance format table."""
+ from pypaimon import Schema
+ import pyarrow as pa
+ import tempfile
+ import shutil
+ from pypaimon.api.api_response import ConfigResponse
+ from pypaimon.api.auth import BearTokenAuthProvider
+ from pypaimon.tests.rest.rest_server import RESTCatalogServer
+ import uuid
+
+ temp_dir = tempfile.mkdtemp(prefix="rest_lance_test_")
+ try:
+ config = ConfigResponse(defaults={"prefix": "mock-test"})
+ token = str(uuid.uuid4())
+ server = RESTCatalogServer(
+ data_path=temp_dir,
+ auth_provider=BearTokenAuthProvider(token),
+ config=config,
+ warehouse="warehouse"
+ )
+ server.start()
+
+ options = {
+ 'metastore': 'rest',
+ 'uri': f"http://localhost:{server.port}",
+ 'warehouse': "warehouse",
+ 'dlf.region': 'cn-hangzhou',
+ "token.provider": "bear",
+ 'token': token,
+ }
+ catalog =
RESTCatalog(CatalogContext.create_from_options(Options(options)))
+ catalog.create_database("default", False)
+
+ # Create table with Lance format
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={'file.format': 'lance'}
+ )
+ identifier = Identifier.create("default", "test_lance_table")
+ catalog.create_table(identifier, schema, False)
+
+ # Write data and commit
+ table = catalog.get_table(identifier)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c']
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ # Verify commit was successful by reading the data back
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits)
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('name').to_pylist(), ['a', 'b',
'c'])
+ finally:
+ server.shutdown()
+ shutil.rmtree(temp_dir, ignore_errors=True)
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index 9f30f90012..96c1529dbc 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -20,6 +20,7 @@ import logging
import pandas as pd
import pyarrow as pa
+import unittest
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
@@ -145,6 +146,16 @@ class RESTTableReadWriteTest(RESTBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_lance_ao_reader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
+ self.rest_catalog.create_table('default.test_append_only_lance',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_lance')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def test_ao_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
@@ -283,6 +294,92 @@ class RESTTableReadWriteTest(RESTBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_pk_lance_reader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={
+ 'bucket': '2',
+ 'file.format': 'lance'
+ })
+ self.rest_catalog.drop_table('default.test_pk_lance', True)
+ self.rest_catalog.create_table('default.test_pk_lance', schema, False)
+ table = self.rest_catalog.get_table('default.test_pk_lance')
+ # Use table's schema for writing to ensure schema consistency
+ from pypaimon.schema.data_types import PyarrowFieldParser
+ table_pa_schema =
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+ self._write_test_table_with_schema(table, table_pa_schema)
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
+ def test_lance_ao_reader_with_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
+
self.rest_catalog.create_table('default.test_append_only_lance_filter', schema,
False)
+ table =
self.rest_catalog.get_table('default.test_append_only_lance_filter')
+ self._write_test_table(table)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.less_than('user_id', 7)
+ p2 = predicate_builder.greater_or_equal('user_id', 2)
+ p3 = predicate_builder.between('user_id', 0, 6)
+ p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])
+ p5 = predicate_builder.is_in('dt', ['p1'])
+ p6 = predicate_builder.is_not_null('behavior')
+ g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ self.expected.slice(5, 1) # 6/f
+ ])
+ self.assertEqual(actual.sort_by('user_id'), expected)
+
+ def test_pk_lance_reader_with_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={
+ 'bucket': '2',
+ 'file.format': 'lance'
+ })
+ self.rest_catalog.create_table('default.test_pk_lance_filter', schema,
False)
+ table = self.rest_catalog.get_table('default.test_pk_lance_filter')
+ from pypaimon.schema.data_types import PyarrowFieldParser
+ table_pa_schema =
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+ self._write_test_table_with_schema(table, table_pa_schema)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.is_in('dt', ['p1'])
+ p2 = predicate_builder.between('user_id', 2, 7)
+ p3 = predicate_builder.is_not_null('behavior')
+ g1 = predicate_builder.and_predicates([p1, p2, p3])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ expected = pa.concat_tables([
+ self.expected.slice(1, 1), # 2/b
+ self.expected.slice(5, 1) # 7/g
+ ])
+ self.assertEqual(actual, expected)
+
+ @unittest.skip("does not support dynamic bucket in dummy rest server")
+ def test_pk_lance_reader_no_bucket(self):
+ """Test Lance format with PrimaryKey table without specifying
bucket."""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'file.format': 'lance'})
+ self.rest_catalog.drop_table('default.test_pk_lance_no_bucket', True)
+ self.rest_catalog.create_table('default.test_pk_lance_no_bucket',
schema, False)
+ table = self.rest_catalog.get_table('default.test_pk_lance_no_bucket')
+ from pypaimon.schema.data_types import PyarrowFieldParser
+ table_pa_schema =
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+ self._write_test_table_with_schema(table, table_pa_schema)
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def test_pk_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 6b81525c7f..65107856ee 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -256,6 +256,8 @@ class DataBlobWriter(DataWriter):
self.file_io.write_orc(file_path, data,
compression=self.compression)
elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
self.file_io.write_avro(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
+ self.file_io.write_lance(file_path, data)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 079b8d26d6..8656a58879 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -176,6 +176,8 @@ class DataWriter(ABC):
self.file_io.write_avro(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
+ elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
+ self.file_io.write_lance(file_path, data)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")