This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 05a3bd42c4 [python] Introduce lance file format support to pypaimon 
(#6746)
05a3bd42c4 is described below

commit 05a3bd42c4c25eea43fb8a8129311aeca3ee3d3f
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Dec 5 09:19:08 2025 +0800

    [python] Introduce lance file format support to pypaimon (#6746)
---
 .github/workflows/paimon-python-checks.yml         |   2 +-
 docs/content/concepts/spec/fileformat.md           |  88 ++++++++-
 paimon-python/dev/requirements.txt                 |   4 +-
 paimon-python/pypaimon/common/core_options.py      |   1 +
 paimon-python/pypaimon/common/file_io.py           |  18 ++
 .../pypaimon/read/reader/format_lance_reader.py    |  72 +++++++
 paimon-python/pypaimon/read/reader/lance_utils.py  |  58 ++++++
 paimon-python/pypaimon/read/split_read.py          |   4 +
 .../sample/rest_catalog_ray_lance_sample.py        | 210 +++++++++++++++++++++
 .../pypaimon/tests/reader_append_only_test.py      |  31 +++
 .../pypaimon/tests/reader_primary_key_test.py      |  55 ++++++
 .../pypaimon/tests/rest/rest_base_test.py          |  36 ++++
 .../rest/rest_catalog_commit_snapshot_test.py      |  74 ++++++++
 .../pypaimon/tests/rest/rest_read_write_test.py    |  97 ++++++++++
 .../pypaimon/write/writer/data_blob_writer.py      |   2 +
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +
 16 files changed, 751 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 80277e701c..e6a10e8949 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -94,7 +94,7 @@ jobs:
             python -m pip install -q readerwriterlock==1.0.9 
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1 
2>&1 >/dev/null
           else
             python -m pip install --upgrade pip
-            python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0 2>&1 
>/dev/null
+            python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 
parameterized==0.9.0 2>&1 >/dev/null
           fi
       - name: Run lint-python.sh
         shell: bash
diff --git a/docs/content/concepts/spec/fileformat.md 
b/docs/content/concepts/spec/fileformat.md
index 5c69b494cc..4a5d2a4d7c 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -26,10 +26,11 @@ under the License.
 
 # File Format
 
-Currently, supports Parquet, Avro, ORC, CSV, JSON file formats.
+Currently, supports Parquet, Avro, ORC, CSV, JSON, and Lance file formats.
 - Recommended column format is Parquet, which has a high compression rate and 
fast column projection queries.
 - Recommended row based format is Avro, which has good performance n reading 
and writing full row (all columns).
 - Recommended testing format is CSV, which has better readability but the 
worst read-write performance.
+- Recommended format for ML workloads is Lance, which is optimized for vector 
search and machine learning use cases.
 
 ## PARQUET
 
@@ -640,3 +641,88 @@ The following table lists the type mapping from Paimon 
type to JSON type.
     </tr>
     </tbody>
 </table>
+
+## LANCE
+
+Lance is a modern columnar data format optimized for machine learning and 
vector search workloads. It provides high-performance read and write operations 
with native support for Apache Arrow.
+
+The following table lists the type mapping from Paimon type to Lance (Arrow) 
type.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Paimon Type</th>
+        <th class="text-center">Lance (Arrow) type</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>CHAR / VARCHAR / STRING</td>
+      <td>UTF8</td>
+    </tr>
+    <tr>
+      <td>BOOLEAN</td>
+      <td>BOOL</td>
+    </tr>
+    <tr>
+      <td>BINARY / VARBINARY</td>
+      <td>BINARY</td>
+    </tr>
+    <tr>
+      <td>DECIMAL(P, S)</td>
+      <td>DECIMAL128(P, S)</td>
+    </tr>
+    <tr>
+      <td>TINYINT</td>
+      <td>INT8</td>
+    </tr>
+    <tr>
+      <td>SMALLINT</td>
+      <td>INT16</td>
+    </tr>
+    <tr>
+      <td>INT</td>
+      <td>INT32</td>
+    </tr>
+    <tr>
+      <td>BIGINT</td>
+      <td>INT64</td>
+    </tr>
+    <tr>
+      <td>FLOAT</td>
+      <td>FLOAT</td>
+    </tr>
+    <tr>
+      <td>DOUBLE</td>
+      <td>DOUBLE</td>
+    </tr>
+    <tr>
+      <td>DATE</td>
+      <td>DATE32</td>
+    </tr>
+    <tr>
+      <td>TIME</td>
+      <td>TIME32 / TIME64</td>
+    </tr>
+    <tr>
+      <td>TIMESTAMP(P)</td>
+      <td>TIMESTAMP (unit based on precision)</td>
+    </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>LIST</td>
+    </tr>
+    <tr>
+      <td>MULTISET</td>
+      <td>LIST</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>STRUCT</td>
+    </tr>
+    </tbody>
+</table>
+
+Limitations:
+1. Lance file format does not support `MAP` type.
+2. Lance file format does not support `TIMESTAMP_LOCAL_ZONE` type.
diff --git a/paimon-python/dev/requirements.txt 
b/paimon-python/dev/requirements.txt
index a08a0ba44a..342d7a40f4 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -38,4 +38,6 @@ pyarrow==16; python_version >= "3.8"
 ray==2.48.0
 readerwriterlock==1.0.9
 zstandard==0.19.0; python_version<"3.9"
-zstandard==0.24.0; python_version>="3.9"
\ No newline at end of file
+zstandard==0.24.0; python_version>="3.9"
+pylance==0.39.0; python_version>="3.9"
+pylance==0.10.18; python_version>="3.8" and python_version<"3.9"
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 0686132979..0e3fa1bacb 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -43,6 +43,7 @@ class CoreOptions(str, Enum):
     FILE_FORMAT_AVRO = "avro"
     FILE_FORMAT_PARQUET = "parquet"
     FILE_FORMAT_BLOB = "blob"
+    FILE_FORMAT_LANCE = "lance"
     FILE_COMPRESSION = "file.compression"
     FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 15e1d8d8d9..1220074164 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -383,6 +383,24 @@ class FileIO:
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
 
+    def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import lance
+            from pypaimon.read.reader.lance_utils import to_lance_specified
+            file_path_for_lance, storage_options = to_lance_specified(self, 
path)
+
+            writer = lance.file.LanceFileWriter(
+                file_path_for_lance, data.schema, 
storage_options=storage_options, **kwargs)
+            try:
+                # Write all batches
+                for batch in data.to_batches():
+                    writer.write_batch(batch)
+            finally:
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
+
     def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
         try:
             # Validate input constraints
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py 
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
new file mode 100644
index 0000000000..a6b3277167
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+from typing import List, Optional, Any
+
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.lance_utils import to_lance_specified
+
+
+class FormatLanceReader(RecordBatchReader):
+    """
+    A Format Reader that reads record batch from a Lance file using PyArrow,
+    and filters it based on the provided predicate and projection.
+    """
+
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+                 push_down_predicate: Any, batch_size: int = 4096):
+        """Initialize Lance reader."""
+        import lance
+
+        file_path_for_lance, storage_options = to_lance_specified(file_io, 
file_path)
+
+        columns_for_lance = read_fields if read_fields else None
+        lance_reader = lance.file.LanceFileReader(
+            file_path_for_lance,
+            storage_options=storage_options,
+            columns=columns_for_lance)
+        reader_results = lance_reader.read_all()
+
+        # Convert to PyArrow table
+        pa_table = reader_results.to_table()
+
+        if push_down_predicate is not None:
+            in_memory_dataset = ds.InMemoryDataset(pa_table)
+            scanner = in_memory_dataset.scanner(filter=push_down_predicate, 
batch_size=batch_size)
+            self.reader = scanner.to_reader()
+        else:
+            self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        try:
+            # Handle both scanner reader and iterator
+            if hasattr(self.reader, 'read_next_batch'):
+                return self.reader.read_next_batch()
+            else:
+                # Iterator of batches
+                return next(self.reader)
+        except StopIteration:
+            return None
+
+    def close(self):
+        if self.reader is not None:
+            self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py 
b/paimon-python/pypaimon/read/reader/lance_utils.py
new file mode 100644
index 0000000000..c7d768e631
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/lance_utils.py
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import os
+from typing import Dict, Optional, Tuple
+
+from pypaimon.common.config import OssOptions
+from pypaimon.common.file_io import FileIO
+
+
+def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, 
Optional[Dict[str, str]]]:
+    """Convert path and extract storage options for Lance format."""
+    scheme, _, _ = file_io.parse_location(file_path)
+    storage_options = None
+    file_path_for_lance = file_io.to_filesystem_path(file_path)
+
+    if scheme in {'file', None} or not scheme:
+        if not os.path.isabs(file_path_for_lance):
+            file_path_for_lance = os.path.abspath(file_path_for_lance)
+    else:
+        file_path_for_lance = file_path
+
+    if scheme == 'oss':
+        storage_options = {}
+        if hasattr(file_io, 'properties'):
+            endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT)
+            if endpoint:
+                if not endpoint.startswith('http://') and not 
endpoint.startswith('https://'):
+                    storage_options['endpoint'] = f"https://{endpoint}";
+                else:
+                    storage_options['endpoint'] = endpoint
+
+            if OssOptions.OSS_ACCESS_KEY_ID in file_io.properties:
+                storage_options['access_key_id'] = 
file_io.properties[OssOptions.OSS_ACCESS_KEY_ID]
+            if OssOptions.OSS_ACCESS_KEY_SECRET in file_io.properties:
+                storage_options['secret_access_key'] = 
file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET]
+            if OssOptions.OSS_SECURITY_TOKEN in file_io.properties:
+                storage_options['session_token'] = 
file_io.properties[OssOptions.OSS_SECURITY_TOKEN]
+            storage_options['virtual_hosted_style_request'] = 'true'
+
+        file_path_for_lance = file_path.replace('oss://', 's3://')
+
+    return file_path_for_lance, storage_options
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 92152db7ee..94d832eff4 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -37,6 +37,7 @@ from pypaimon.read.reader.empty_record_reader import 
EmptyFileRecordReader
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -101,6 +102,9 @@ class SplitRead(ABC):
             blob_as_descriptor = 
CoreOptions.blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
                                              self.read_fields, 
read_arrow_predicate, blob_as_descriptor)
+        elif file_format == CoreOptions.FILE_FORMAT_LANCE:
+            format_reader = FormatLanceReader(self.table.file_io, file_path, 
read_file_fields,
+                                              read_arrow_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
                                                 read_file_fields, 
read_arrow_predicate)
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py
new file mode 100644
index 0000000000..643edec843
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_lance_sample.py
@@ -0,0 +1,210 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+"""
+Example: REST Catalog + Ray Data + Lance Format Integration
+
+Demonstrates:
+1. REST catalog with Lance file format
+2. Writing data to Paimon table using Lance format
+3. Reading data using Ray Data for distributed processing
+4. Integration of REST catalog, Ray Data, and Lance format
+"""
+
+import tempfile
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+import ray
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+
+
+def main():
+    """REST catalog + Ray Data + Lance format integration example."""
+
+    # Initialize Ray
+    ray.init(ignore_reinit_error=True)
+    print("Ray initialized successfully")
+
+    # Setup mock REST server
+    temp_dir = tempfile.mkdtemp()
+    token = str(uuid.uuid4())
+    server = RESTCatalogServer(
+        data_path=temp_dir,
+        auth_provider=BearTokenAuthProvider(token),
+        config=ConfigResponse(defaults={"prefix": "mock-test"}),
+        warehouse="warehouse"
+    )
+    server.start()
+    print(f"REST server started at: {server.get_url()}")
+
+    try:
+        # Create REST catalog
+        catalog = CatalogFactory.create({
+            'metastore': 'rest',
+            'uri': f"http://localhost:{server.port}";,
+            'warehouse': "warehouse",  # Must match server's warehouse 
parameter
+            'token.provider': 'bear',
+            'token': token,
+        })
+        catalog.create_database("default", True)
+
+        # Create table schema with Lance format
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('value', pa.float64()),
+            ('category', pa.string()),
+        ])
+
+        # Create schema with Lance file format option
+        schema = Schema.from_pyarrow_schema(
+            pa_schema=pa_schema,
+            primary_keys=['id'],
+            options={
+                'bucket': '2',
+                CoreOptions.FILE_FORMAT: CoreOptions.FILE_FORMAT_LANCE,
+            }
+        )
+
+        table_name = 'default.ray_lance_example'
+        catalog.create_table(table_name, schema, False)
+        table = catalog.get_table(table_name)
+
+        table_path = table.table_path
+        print(f"\nTable path: {table_path}")
+        print(f"File format: {CoreOptions.FILE_FORMAT_LANCE}")
+
+        # Write data using Lance format
+        print("\nWriting data with Lance format...")
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        # Write multiple batches to demonstrate Lance format
+        data1 = pd.DataFrame({
+            'id': [1, 2, 3, 4, 5],
+            'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+            'value': [10.5, 20.3, 30.7, 40.1, 50.9],
+            'category': ['A', 'B', 'A', 'B', 'A'],
+        })
+        table_write.write_pandas(data1)
+
+        data2 = pd.DataFrame({
+            'id': [6, 7, 8],
+            'name': ['Frank', 'Grace', 'Henry'],
+            'value': [60.2, 70.4, 80.6],
+            'category': ['B', 'A', 'B'],
+        })
+        table_write.write_pandas(data2)
+
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        print("Data written successfully with Lance format!")
+
+        # Read data using standard methods
+        print("\nReading data using standard methods...")
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+
+        # Read to Pandas
+        result_pandas = table_read.to_pandas(splits)
+        print("\nPandas DataFrame result:")
+        print(result_pandas)
+
+        # Read to Arrow
+        result_arrow = table_read.to_arrow(splits)
+        print(f"\nArrow Table result: {result_arrow.num_rows} rows")
+
+        # Read using Ray Data
+        print("\nReading data using Ray Data for distributed processing...")
+        ray_dataset = table_read.to_ray(splits)
+
+        print(f"Ray Dataset: {ray_dataset}")
+        print(f"Number of rows: {ray_dataset.count()}")
+
+        # Sample Ray Data operations
+        print("\nRay Data operations:")
+
+        # Take first few rows
+        sample_data = ray_dataset.take(3)
+        print(f"First 3 rows: {sample_data}")
+
+        # Filter data
+        filtered_dataset = ray_dataset.filter(lambda row: row['value'] > 30.0)
+        print(f"Filtered rows (value > 30): {filtered_dataset.count()}")
+
+        # Map operation
+        def double_value(row):
+            row['value'] = row['value'] * 2
+            return row
+
+        mapped_dataset = ray_dataset.map(double_value)
+        print(f"Mapped dataset (doubled values): {mapped_dataset.count()} 
rows")
+
+        # Convert to Pandas
+        ray_pandas = ray_dataset.to_pandas()
+        print("\nRay Dataset converted to Pandas:")
+        print(ray_pandas)
+
+        # Group by category
+        print("\nGrouping by category using Ray Data:")
+        grouped = ray_dataset.groupby("category").sum("value")
+        print(grouped)
+
+        # Demonstrate predicate pushdown with Lance format
+        print("\nDemonstrating predicate pushdown with Lance format...")
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.greater_than('value', 30.0)
+        read_builder_filtered = read_builder.with_filter(predicate)
+
+        table_scan_filtered = read_builder_filtered.new_scan()
+        table_read_filtered = read_builder_filtered.new_read()
+        splits_filtered = table_scan_filtered.plan().splits()
+
+        result_filtered = table_read_filtered.to_pandas(splits_filtered)
+        print("\nFiltered result (value > 30):")
+        print(result_filtered)
+
+        # Demonstrate projection with Lance format
+        print("\nDemonstrating column projection with Lance format...")
+        read_builder_projected = read_builder.with_projection(['id', 'name', 
'value'])
+        table_scan_projected = read_builder_projected.new_scan()
+        table_read_projected = read_builder_projected.new_read()
+        splits_projected = table_scan_projected.plan().splits()
+
+        result_projected = table_read_projected.to_pandas(splits_projected)
+        print("\nProjected result (id, name, value only):")
+        print(result_projected)
+
+    finally:
+        server.shutdown()
+        ray.shutdown()
+        print("\nServer stopped and Ray shutdown")
+
+
+if __name__ == '__main__':
+    main()
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 3a99196854..90d0924ae8 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -83,6 +83,37 @@ class AoReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_lance_ao_reader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
+        self.catalog.create_table('default.test_append_only_lance', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_lance')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
+    def test_lance_ao_reader_with_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
+        self.catalog.create_table('default.test_append_only_lance_filter', 
schema, False)
+        table = self.catalog.get_table('default.test_append_only_lance_filter')
+        self._write_test_table(table)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.less_than('user_id', 7)
+        p2 = predicate_builder.greater_or_equal('user_id', 2)
+        p3 = predicate_builder.between('user_id', 0, 6)  # [2/b, 3/c, 4/d, 
5/e, 6/f] left
+        p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])  # [3/c, 4/d, 
6/f] left
+        p5 = predicate_builder.is_in('dt', ['p1'])  # exclude 3/c
+        p6 = predicate_builder.is_not_null('behavior')  # exclude 4/d
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(5, 1)  # 6/f
+        ])
+        self.assertEqual(actual.sort_by('user_id'), expected)
+
     def test_append_only_multi_write_once_commit(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         
self.catalog.create_table('default.test_append_only_multi_once_commit', schema, 
False)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index bcbc94bd68..be35844c70 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -106,6 +106,61 @@ class PkReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_pk_lance_reader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={
+                                                'bucket': '2',
+                                                'file.format': 'lance'
+                                            })
+        self.catalog.create_table('default.test_pk_lance', schema, False)
+        table = self.catalog.get_table('default.test_pk_lance')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        splits = table_scan.plan().splits()
+
+        for split in splits:
+            for file in split.files:
+                file_path = file.file_path
+                table_path = os.path.join(self.warehouse, 'default.db', 
'test_pk_lance')
+                full_path = os.path.join(table_path, file_path)
+                if os.path.exists(full_path):
+                    self.assertTrue(os.path.exists(full_path))
+                    self.assertTrue(
+                        file_path.endswith('.lance'),
+                        f"Expected file path to end with .lance, got 
{file_path}")
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
+    def test_pk_lance_reader_with_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={
+                                                'bucket': '2',
+                                                'file.format': 'lance'
+                                            })
+        self.catalog.create_table('default.test_pk_lance_filter', schema, 
False)
+        table = self.catalog.get_table('default.test_pk_lance_filter')
+        self._write_test_table(table)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.is_in('dt', ['p1'])
+        p2 = predicate_builder.between('user_id', 2, 7)
+        p3 = predicate_builder.is_not_null('behavior')
+        g1 = predicate_builder.and_predicates([p1, p2, p3])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        expected = pa.concat_tables([
+            self.expected.slice(1, 1),  # 2/b
+            self.expected.slice(5, 1)  # 7/g
+        ])
+        self.assertEqual(actual, expected)
+
     def test_pk_multi_write_once_commit(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema,
                                             partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py 
b/paimon-python/pypaimon/tests/rest/rest_base_test.py
index e45fac5cde..7f3e44afcb 100644
--- a/paimon-python/pypaimon/tests/rest/rest_base_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py
@@ -227,3 +227,39 @@ class RESTBaseTest(unittest.TestCase):
         table_read = read_builder.new_read()
         splits = read_builder.new_scan().plan().splits()
         return table_read.to_arrow(splits)
+
+    def _write_test_table_with_schema(self, table, pa_schema):
+        """Write test data using the specified PyArrow schema."""
+        write_builder = table.new_batch_write_builder()
+
+        # first write
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+            'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', 
'2024-01-01'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # second write
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+            'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', 
'2025-08-08'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
diff --git 
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py 
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index 84c6557c1e..baec93fad1 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -216,6 +216,80 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
                 # Verify client was called correctly
                 mock_client.post_with_response_type.assert_called_once()
 
+    def test_rest_catalog_commit_snapshot_with_lance_format(self):
+        """Test snapshot commit with Lance format table."""
+        from pypaimon import Schema
+        import pyarrow as pa
+        import tempfile
+        import shutil
+        from pypaimon.api.api_response import ConfigResponse
+        from pypaimon.api.auth import BearTokenAuthProvider
+        from pypaimon.tests.rest.rest_server import RESTCatalogServer
+        import uuid
+
+        temp_dir = tempfile.mkdtemp(prefix="rest_lance_test_")
+        try:
+            config = ConfigResponse(defaults={"prefix": "mock-test"})
+            token = str(uuid.uuid4())
+            server = RESTCatalogServer(
+                data_path=temp_dir,
+                auth_provider=BearTokenAuthProvider(token),
+                config=config,
+                warehouse="warehouse"
+            )
+            server.start()
+
+            options = {
+                'metastore': 'rest',
+                'uri': f"http://localhost:{server.port}";,
+                'warehouse': "warehouse",
+                'dlf.region': 'cn-hangzhou',
+                "token.provider": "bear",
+                'token': token,
+            }
+            catalog = 
RESTCatalog(CatalogContext.create_from_options(Options(options)))
+            catalog.create_database("default", False)
+
+            # Create table with Lance format
+            pa_schema = pa.schema([
+                ('id', pa.int32()),
+                ('name', pa.string())
+            ])
+            schema = Schema.from_pyarrow_schema(
+                pa_schema,
+                options={'file.format': 'lance'}
+            )
+            identifier = Identifier.create("default", "test_lance_table")
+            catalog.create_table(identifier, schema, False)
+
+            # Write data and commit
+            table = catalog.get_table(identifier)
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+
+            data = pa.Table.from_pydict({
+                'id': [1, 2, 3],
+                'name': ['a', 'b', 'c']
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            commit_messages = table_write.prepare_commit()
+            table_commit.commit(commit_messages)
+            table_write.close()
+            table_commit.close()
+
+            # Verify commit was successful by reading the data back
+            read_builder = table.new_read_builder()
+            table_read = read_builder.new_read()
+            splits = read_builder.new_scan().plan().splits()
+            actual = table_read.to_arrow(splits)
+            self.assertEqual(actual.num_rows, 3)
+            self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+            self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 
'c'])
+        finally:
+            server.shutdown()
+            shutil.rmtree(temp_dir, ignore_errors=True)
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py 
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index 9f30f90012..96c1529dbc 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -20,6 +20,7 @@ import logging
 
 import pandas as pd
 import pyarrow as pa
+import unittest
 
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog_context import CatalogContext
@@ -145,6 +146,16 @@ class RESTTableReadWriteTest(RESTBaseTest):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_lance_ao_reader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
+        self.rest_catalog.create_table('default.test_append_only_lance', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_lance')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def test_ao_reader_with_filter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_filter', 
schema, False)
@@ -283,6 +294,92 @@ class RESTTableReadWriteTest(RESTBaseTest):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_pk_lance_reader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={
+                                                'bucket': '2',
+                                                'file.format': 'lance'
+                                            })
+        self.rest_catalog.drop_table('default.test_pk_lance', True)
+        self.rest_catalog.create_table('default.test_pk_lance', schema, False)
+        table = self.rest_catalog.get_table('default.test_pk_lance')
+        # Use table's schema for writing to ensure schema consistency
+        from pypaimon.schema.data_types import PyarrowFieldParser
+        table_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+        self._write_test_table_with_schema(table, table_pa_schema)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
+    def test_lance_ao_reader_with_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
+        
self.rest_catalog.create_table('default.test_append_only_lance_filter', schema, 
False)
+        table = 
self.rest_catalog.get_table('default.test_append_only_lance_filter')
+        self._write_test_table(table)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.less_than('user_id', 7)
+        p2 = predicate_builder.greater_or_equal('user_id', 2)
+        p3 = predicate_builder.between('user_id', 0, 6)
+        p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])
+        p5 = predicate_builder.is_in('dt', ['p1'])
+        p6 = predicate_builder.is_not_null('behavior')
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(5, 1)  # 6/f
+        ])
+        self.assertEqual(actual.sort_by('user_id'), expected)
+
+    def test_pk_lance_reader_with_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={
+                                                'bucket': '2',
+                                                'file.format': 'lance'
+                                            })
+        self.rest_catalog.create_table('default.test_pk_lance_filter', schema, 
False)
+        table = self.rest_catalog.get_table('default.test_pk_lance_filter')
+        from pypaimon.schema.data_types import PyarrowFieldParser
+        table_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+        self._write_test_table_with_schema(table, table_pa_schema)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.is_in('dt', ['p1'])
+        p2 = predicate_builder.between('user_id', 2, 7)
+        p3 = predicate_builder.is_not_null('behavior')
+        g1 = predicate_builder.and_predicates([p1, p2, p3])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        expected = pa.concat_tables([
+            self.expected.slice(1, 1),  # 2/b
+            self.expected.slice(5, 1)  # 7/g
+        ])
+        self.assertEqual(actual, expected)
+
+    @unittest.skip("does not support dynamic bucket in dummy rest server")
+    def test_pk_lance_reader_no_bucket(self):
+        """Test Lance format with PrimaryKey table without specifying 
bucket."""
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'file.format': 'lance'})
+        self.rest_catalog.drop_table('default.test_pk_lance_no_bucket', True)
+        self.rest_catalog.create_table('default.test_pk_lance_no_bucket', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_pk_lance_no_bucket')
+        from pypaimon.schema.data_types import PyarrowFieldParser
+        table_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+        self._write_test_table_with_schema(table, table_pa_schema)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def test_pk_reader_with_filter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema,
                                             partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 6b81525c7f..65107856ee 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -256,6 +256,8 @@ class DataBlobWriter(DataWriter):
             self.file_io.write_orc(file_path, data, 
compression=self.compression)
         elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
             self.file_io.write_avro(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
+            self.file_io.write_lance(file_path, data)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 079b8d26d6..8656a58879 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -176,6 +176,8 @@ class DataWriter(ABC):
             self.file_io.write_avro(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
             self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
+        elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
+            self.file_io.write_lance(file_path, data)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 


Reply via email to