This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87bdda4111 [core]Python: fix blob write when blob_as_descriptor is
true (#6404)
87bdda4111 is described below
commit 87bdda4111a214f69d16a2fce748cabc800fc76d
Author: jerry <[email protected]>
AuthorDate: Fri Oct 17 10:11:40 2025 +0800
[core]Python: fix blob write when blob_as_descriptor is true (#6404)
---
paimon-python/pypaimon/common/config.py | 1 +
paimon-python/pypaimon/common/core_options.py | 1 +
paimon-python/pypaimon/common/file_io.py | 13 +-
paimon-python/pypaimon/common/uri_reader.py | 171 ++++++++++++++++
.../pypaimon/read/reader/format_blob_reader.py | 48 ++---
paimon-python/pypaimon/read/split_read.py | 3 +-
paimon-python/pypaimon/table/row/blob.py | 61 +++---
paimon-python/pypaimon/tests/blob_test.py | 153 ++++++++++----
.../pypaimon/tests/uri_reader_factory_test.py | 227 +++++++++++++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 3 +-
10 files changed, 582 insertions(+), 99 deletions(-)
diff --git a/paimon-python/pypaimon/common/config.py
b/paimon-python/pypaimon/common/config.py
index 0478c207bb..81b05b8f84 100644
--- a/paimon-python/pypaimon/common/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -47,6 +47,7 @@ class CatalogOptions:
DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
PREFIX = 'prefix'
HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
+ BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1
class PVFSOptions:
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 82b788438e..8ab26fe062 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -43,6 +43,7 @@ class CoreOptions(str, Enum):
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
FILE_BLOCK_SIZE = "file.block-size"
+ FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index f6371d2d80..eb32ebb755 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
import logging
import os
import subprocess
@@ -28,8 +27,9 @@ from packaging.version import parse
from pyarrow._fs import FileSystem
from pypaimon.common.config import OssOptions, S3Options
+from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -40,6 +40,7 @@ class FileIO:
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
scheme, netloc, _ = self.parse_location(path)
+ self.uri_reader_factory = UriReaderFactory(catalog_options)
if scheme in {"oss"}:
self.filesystem = self._initialize_oss_fs(path)
elif scheme in {"s3", "s3a", "s3n"}:
@@ -370,7 +371,7 @@ class FileIO:
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
- def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+ def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
try:
# Validate input constraints
if data.num_columns != 1:
@@ -399,7 +400,11 @@ class FileIO:
col_data = records_dict[field_name][i]
# Convert to appropriate type based on field type
if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
- if isinstance(col_data, bytes):
+ if blob_as_descriptor:
+ blob_descriptor =
BlobDescriptor.deserialize(col_data)
+ uri_reader =
self.uri_reader_factory.create(blob_descriptor.uri)
+ blob_data = Blob.from_descriptor(uri_reader,
blob_descriptor)
+ elif isinstance(col_data, bytes):
blob_data = BlobData(col_data)
else:
# Convert to bytes if needed
diff --git a/paimon-python/pypaimon/common/uri_reader.py
b/paimon-python/pypaimon/common/uri_reader.py
new file mode 100644
index 0000000000..823020caaf
--- /dev/null
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -0,0 +1,171 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+
+import requests
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+ @classmethod
+ def from_http(cls) -> 'HttpUriReader':
+ return HttpUriReader()
+
+ @classmethod
+ def from_file(cls, file_io: Any) -> 'FileUriReader':
+ return FileUriReader(file_io)
+
+ @classmethod
+ def get_file_path(cls, uri: str):
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'file':
+ path = Path(parsed_uri.path)
+ elif parsed_uri.scheme and parsed_uri.scheme != '':
+ path = Path(parsed_uri.netloc + parsed_uri.path)
+ else:
+ path = Path(uri)
+ return path
+
+ @abstractmethod
+ def new_input_stream(self, uri: str):
+ pass
+
+
+class FileUriReader(UriReader):
+
+ def __init__(self, file_io: Any):
+ self._file_io = file_io
+
+ def new_input_stream(self, uri: str):
+ try:
+ path = self.get_file_path(uri)
+ return self._file_io.new_input_stream(path)
+ except Exception as e:
+ raise IOError(f"Failed to read file {uri}: {e}")
+
+
+class HttpUriReader(UriReader):
+
+ def new_input_stream(self, uri: str):
+ try:
+ response = requests.get(uri)
+ if response.status_code != 200:
+ raise RuntimeError(f"Failed to read HTTP URI {uri} status code
{response.status_code}")
+ return io.BytesIO(response.content)
+ except Exception as e:
+ raise RuntimeError(f"Failed to read HTTP URI {uri}: {e}")
+
+
+class UriKey:
+
+ def __init__(self, scheme: Optional[str], authority: Optional[str]) ->
None:
+ self._scheme = scheme
+ self._authority = authority
+ self._hash = hash((self._scheme, self._authority))
+
+ @property
+ def scheme(self) -> Optional[str]:
+ return self._scheme
+
+ @property
+ def authority(self) -> Optional[str]:
+ return self._authority
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, UriKey):
+ return False
+
+ return (self._scheme == other._scheme and
+ self._authority == other._authority)
+
+ def __hash__(self) -> int:
+ return self._hash
+
+ def __repr__(self) -> str:
+ return f"UriKey(scheme='{self._scheme}',
authority='{self._authority}')"
+
+
+class UriReaderFactory:
+
+ def __init__(self, catalog_options: dict) -> None:
+ self.catalog_options = catalog_options
+ self._readers =
LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)
+ self._readers_lock = rwlock.RWLockFair()
+
+ def create(self, input_uri: str) -> UriReader:
+ try:
+ parsed_uri = urlparse(input_uri)
+ except Exception as e:
+ raise ValueError(f"Invalid URI: {input_uri}") from e
+
+ key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None)
+ rlock = self._readers_lock.gen_rlock()
+ rlock.acquire()
+ try:
+ reader = self._readers.get(key)
+ if reader is not None:
+ return reader
+ finally:
+ rlock.release()
+ wlock = self._readers_lock.gen_wlock()
+ wlock.acquire()
+ try:
+ reader = self._readers.get(key)
+ if reader is not None:
+ return reader
+ reader = self._new_reader(key, parsed_uri)
+ self._readers[key] = reader
+ return reader
+ finally:
+ wlock.release()
+
+ def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader:
+ scheme = key.scheme
+ if scheme in ('http', 'https'):
+ return UriReader.from_http()
+ try:
+ # Import FileIO here to avoid circular imports
+ from pypaimon.common.file_io import FileIO
+ uri_string = parsed_uri.geturl()
+ file_io = FileIO(uri_string, self.catalog_options)
+ return UriReader.from_file(file_io)
+ except Exception as e:
+ raise RuntimeError(f"Failed to create reader for URI
{parsed_uri.geturl()}") from e
+
+ def clear_cache(self) -> None:
+ self._readers.clear()
+
+ def get_cache_size(self) -> int:
+ return len(self._readers)
+
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state['_readers_lock']
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+ self._readers_lock = rwlock.RWLockFair()
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 709a0c27a3..e6846dc568 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -26,21 +26,23 @@ from pyarrow import RecordBatch
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
from pypaimon.common.file_io import FileIO
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
-from pypaimon.schema.data_types import DataField, PyarrowFieldParser
-from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser,
AtomicType
+from pypaimon.table.row.blob import Blob
from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
class FormatBlobReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
- full_fields: List[DataField], push_down_predicate: Any):
+ full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
+ self._blob_as_descriptor = blob_as_descriptor
# Get file size
- self._file_size = file_io.get_file_size(file_path)
+ self._file_size = file_io.get_file_size(Path(file_path))
# Initialize the low-level blob format reader
self.file_path = file_path
@@ -66,7 +68,10 @@ class FormatBlobReader(RecordBatchReader):
if self.returned:
return None
self.returned = True
- batch_iterator = BlobRecordIterator(self.file_path,
self.blob_lengths, self.blob_offsets, self._fields[0])
+ batch_iterator = BlobRecordIterator(
+ self._file_io, self.file_path, self.blob_lengths,
+ self.blob_offsets, self._fields[0]
+ )
self._blob_iterator = iter(batch_iterator)
# Collect records for this batch
@@ -75,22 +80,16 @@ class FormatBlobReader(RecordBatchReader):
try:
while True:
- # Get next blob record
blob_row = next(self._blob_iterator)
- # Check if first read returns None, stop immediately
if blob_row is None:
break
-
- # Extract blob data from the row
- blob = blob_row.values[0] # Blob files have single blob field
-
- # Convert blob to appropriate format for each requested field
+ blob = blob_row.values[0]
for field_name in self._fields:
- # For blob files, all fields should contain blob data
- if isinstance(blob, Blob):
- blob_data = blob.to_data()
+ blob_descriptor = blob.to_descriptor()
+ if self._blob_as_descriptor:
+ blob_data = blob_descriptor.serialize()
else:
- blob_data = bytes(blob) if blob is not None else None
+ blob_data = blob.to_data()
pydict_data[field_name].append(blob_data)
records_in_batch += 1
@@ -162,7 +161,9 @@ class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
METADATA_OVERHEAD = 16
- def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets:
List[int], field_name: str):
+ def __init__(self, file_io: FileIO, file_path: str, blob_lengths:
List[int],
+ blob_offsets: List[int], field_name: str):
+ self.file_io = file_io
self.file_path = file_path
self.field_name = field_name
self.blob_lengths = blob_lengths
@@ -175,25 +176,14 @@ class BlobRecordIterator:
def __next__(self) -> GenericRow:
if self.current_position >= len(self.blob_lengths):
raise StopIteration
-
# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] +
self.MAGIC_NUMBER_SIZE # Skip magic number
blob_length = self.blob_lengths[self.current_position] -
self.METADATA_OVERHEAD
-
- # Create BlobDescriptor for this blob
- descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
- blob = BlobRef(descriptor)
-
+ blob = Blob.from_file(self.file_io, self.file_path, blob_offset,
blob_length)
self.current_position += 1
-
- # Return as GenericRow with single blob field
- from pypaimon.schema.data_types import DataField, AtomicType
- from pypaimon.table.row.row_kind import RowKind
-
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
return GenericRow([blob], fields, RowKind.INSERT)
def returned_position(self) -> int:
- """Get current position in the iterator."""
return self.current_position
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 81ebdd86f8..372679ed63 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -82,8 +82,9 @@ class SplitRead(ABC):
format_reader = FormatAvroReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
self.read_fields,
self.push_down_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+ blob_as_descriptor =
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
format_reader = FormatBlobReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
- self.read_fields,
self.push_down_predicate)
+ self.read_fields,
self.push_down_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
self._get_final_read_data_fields(), self.push_down_predicate)
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
index b9d8cc0e38..b92ddb3e82 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -19,6 +19,9 @@
import io
from abc import ABC, abstractmethod
from typing import Optional, Union
+from urllib.parse import urlparse
+
+from pypaimon.common.uri_reader import UriReader, FileUriReader
class BlobDescriptor:
@@ -144,21 +147,33 @@ class Blob(ABC):
@staticmethod
def from_local(file: str) -> 'Blob':
- return Blob.from_file(file)
+ # Import FileIO locally to avoid circular imports
+ from pypaimon.common.file_io import FileIO
+
+ parsed = urlparse(file)
+ if parsed.scheme == "file":
+ file_uri = file
+ else:
+ file_uri = f"file://{file}"
+ file_io = FileIO(file_uri, {})
+ uri_reader = FileUriReader(file_io)
+ descriptor = BlobDescriptor(file, 0, -1)
+ return Blob.from_descriptor(uri_reader, descriptor)
@staticmethod
def from_http(uri: str) -> 'Blob':
descriptor = BlobDescriptor(uri, 0, -1)
- return BlobRef(descriptor)
+ return BlobRef(UriReader.from_http(), descriptor)
@staticmethod
- def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob':
- descriptor = BlobDescriptor(file, offset, length)
- return BlobRef(descriptor)
+ def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob':
+ uri_reader = FileUriReader(file_io)
+ descriptor = BlobDescriptor(file_path, offset, length)
+ return Blob.from_descriptor(uri_reader, descriptor)
@staticmethod
- def from_descriptor(descriptor: BlobDescriptor) -> 'Blob':
- return BlobRef(descriptor)
+ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) ->
'Blob':
+ return BlobRef(uri_reader, descriptor)
class BlobData(Blob):
@@ -199,7 +214,8 @@ class BlobData(Blob):
class BlobRef(Blob):
- def __init__(self, descriptor: BlobDescriptor):
+ def __init__(self, uri_reader: UriReader, descriptor: BlobDescriptor):
+ self._uri_reader = uri_reader
self._descriptor = descriptor
def to_data(self) -> bytes:
@@ -216,27 +232,14 @@ class BlobRef(Blob):
uri = self._descriptor.uri
offset = self._descriptor.offset
length = self._descriptor.length
-
- if uri.startswith('http://') or uri.startswith('https://'):
- raise NotImplementedError("HTTP blob reading not implemented yet")
- elif uri.startswith('file://') or '://' not in uri:
- file_path = uri.replace('file://', '') if
uri.startswith('file://') else uri
-
- try:
- with open(file_path, 'rb') as f:
- if offset > 0:
- f.seek(offset)
-
- if length == -1:
- data = f.read()
- else:
- data = f.read(length)
-
- return io.BytesIO(data)
- except Exception as e:
- raise IOError(f"Failed to read file {file_path}: {e}")
- else:
- raise ValueError(f"Unsupported URI scheme: {uri}")
+ with self._uri_reader.new_input_stream(uri) as input_stream:
+ if offset > 0:
+ input_stream.seek(offset)
+ if length == -1:
+ data = input_stream.read()
+ else:
+ data = input_stream.read(length)
+ return io.BytesIO(data)
def __eq__(self, other) -> bool:
if not isinstance(other, BlobRef):
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index b8ed65e5bb..b66c1dec9a 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -92,7 +92,8 @@ class BlobTest(unittest.TestCase):
def test_from_file_with_offset_and_length(self):
"""Test Blob.from_file() method with offset and length."""
- blob = Blob.from_file(self.file, 0, 4)
+ file_io = FileIO(self.file if self.file.startswith('file://') else
f"file://{self.file}", {})
+ blob = Blob.from_file(file_io, self.file, 0, 4)
# Verify it returns a BlobRef instance
self.assertIsInstance(blob, BlobRef)
@@ -100,16 +101,6 @@ class BlobTest(unittest.TestCase):
# Verify the data matches (first 4 bytes: "test")
self.assertEqual(blob.to_data(), b"test")
- def test_from_file_full(self):
- """Test Blob.from_file() method without offset and length."""
- blob = Blob.from_file(self.file)
-
- # Verify it returns a BlobRef instance
- self.assertIsInstance(blob, BlobRef)
-
- # Verify the data matches
- self.assertEqual(blob.to_data(), b"test data")
-
def test_from_http(self):
"""Test Blob.from_http() method."""
uri = "http://example.com/file.txt"
@@ -197,7 +188,8 @@ class BlobTest(unittest.TestCase):
self.assertIsInstance(blob_ref, Blob)
# from_file should return BlobRef
- blob_file = Blob.from_file(self.file)
+ file_io = FileIO(self.file if self.file.startswith('file://') else
f"file://{self.file}", {})
+ blob_file = Blob.from_file(file_io, self.file, 0,
os.path.getsize(self.file))
self.assertIsInstance(blob_file, BlobRef)
self.assertIsInstance(blob_file, Blob)
@@ -505,7 +497,7 @@ class BlobTest(unittest.TestCase):
descriptor = BlobDescriptor(self.file, 0, -1)
# Create BlobRef from descriptor
- blob_ref = BlobRef(descriptor)
+ blob_ref = Blob.from_local(self.file)
# Verify descriptor is preserved
returned_descriptor = blob_ref.to_descriptor()
@@ -554,11 +546,6 @@ class BlobEndToEndTest(unittest.TestCase):
pass
def test_blob_end_to_end(self):
- from pypaimon.schema.data_types import DataField, AtomicType
- from pypaimon.table.row.blob import BlobData
- from pypaimon.common.file_io import FileIO
- from pathlib import Path
-
# Set up file I/O
file_io = FileIO(self.temp_dir, {})
@@ -576,7 +563,7 @@ class BlobEndToEndTest(unittest.TestCase):
schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
table = pa.table([blob_data], schema=schema)
blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name +
".blob")
- file_io.write_blob(blob_files[blob_field_name], table)
+ file_io.write_blob(blob_files[blob_field_name], table, False)
self.assertTrue(file_io.exists(blob_files[blob_field_name]))
# ========== Step 3: Read Data and Check Data ==========
@@ -587,7 +574,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(file_path),
read_fields=[field_name],
full_fields=read_fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
# Read data
@@ -693,7 +681,7 @@ class BlobEndToEndTest(unittest.TestCase):
# Should throw RuntimeError for multiple columns
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(multi_column_file, multi_column_table)
+ file_io.write_blob(multi_column_file, multi_column_table, False)
self.assertIn("single column", str(context.exception))
# Test that FileIO.write_blob rejects null values
@@ -704,7 +692,7 @@ class BlobEndToEndTest(unittest.TestCase):
# Should throw RuntimeError for null values
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file, null_table)
+ file_io.write_blob(null_file, null_table, False)
self.assertIn("null values", str(context.exception))
# ========== Test FormatBlobReader with complex type schema ==========
@@ -714,7 +702,7 @@ class BlobEndToEndTest(unittest.TestCase):
valid_table = pa.table([valid_blob_data], schema=valid_schema)
valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
- file_io.write_blob(valid_blob_file, valid_table)
+ file_io.write_blob(valid_blob_file, valid_table, False)
# Try to read with complex type field definition - this should fail
# because FormatBlobReader tries to create PyArrow schema with complex
types
@@ -728,7 +716,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(valid_blob_file),
read_fields=["valid_blob"],
full_fields=complex_read_fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
# Reading should fail because the schema expects complex type but data
is atomic
@@ -761,7 +750,7 @@ class BlobEndToEndTest(unittest.TestCase):
valid_table = pa.table([valid_blob_data], schema=valid_schema)
header_test_file = Path(self.temp_dir) / "header_test.blob"
- file_io.write_blob(header_test_file, valid_table)
+ file_io.write_blob(header_test_file, valid_table, False)
# Read the file and corrupt the header (last 5 bytes: index_length +
version)
with open(header_test_file, 'rb') as f:
@@ -785,7 +774,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(corrupted_header_file),
read_fields=["test_blob"],
full_fields=fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
self.assertIn("Unsupported blob file version", str(context.exception))
@@ -798,7 +788,7 @@ class BlobEndToEndTest(unittest.TestCase):
large_table = pa.table([large_blob_data], schema=large_schema)
full_blob_file = Path(self.temp_dir) / "full_blob.blob"
- file_io.write_blob(full_blob_file, large_table)
+ file_io.write_blob(full_blob_file, large_table, False)
# Read the full file and truncate it in the middle
with open(full_blob_file, 'rb') as f:
@@ -819,7 +809,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(truncated_file),
read_fields=["large_blob"],
full_fields=fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
# Should detect truncation/incomplete data (either invalid header or
invalid version)
self.assertTrue(
@@ -835,7 +826,7 @@ class BlobEndToEndTest(unittest.TestCase):
zero_table = pa.table([zero_blob_data], schema=zero_schema)
zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
- file_io.write_blob(zero_blob_file, zero_table)
+ file_io.write_blob(zero_blob_file, zero_table, False)
# Verify file was created
self.assertTrue(file_io.exists(zero_blob_file))
@@ -849,7 +840,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(zero_blob_file),
read_fields=["zero_blob"],
full_fields=zero_fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
zero_batch = zero_reader.read_arrow_batch()
@@ -876,7 +868,7 @@ class BlobEndToEndTest(unittest.TestCase):
large_sim_table = pa.table([simulated_large_data],
schema=large_sim_schema)
large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
- file_io.write_blob(large_sim_file, large_sim_table)
+ file_io.write_blob(large_sim_file, large_sim_table, False)
# Verify large file was written
large_sim_size = file_io.get_file_size(large_sim_file)
@@ -889,7 +881,8 @@ class BlobEndToEndTest(unittest.TestCase):
file_path=str(large_sim_file),
read_fields=["large_sim_blob"],
full_fields=large_sim_fields,
- push_down_predicate=None
+ push_down_predicate=None,
+ blob_as_descriptor=False
)
large_sim_batch = large_sim_reader.read_arrow_batch()
@@ -947,7 +940,7 @@ class BlobEndToEndTest(unittest.TestCase):
# Should reject multi-field table
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(multi_field_file, multi_field_table)
+ file_io.write_blob(multi_field_file, multi_field_table, False)
self.assertIn("single column", str(context.exception))
# Test that blob format rejects non-binary field types
@@ -958,7 +951,7 @@ class BlobEndToEndTest(unittest.TestCase):
# Should reject non-binary field
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(non_binary_file, non_binary_table)
+ file_io.write_blob(non_binary_file, non_binary_table, False)
# Should fail due to type conversion issues (non-binary field can't be
converted to BLOB)
self.assertTrue(
"large_binary" in str(context.exception) or
@@ -975,9 +968,99 @@ class BlobEndToEndTest(unittest.TestCase):
# Should reject null values
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file, null_table)
+ file_io.write_blob(null_file, null_table, False)
self.assertIn("null values", str(context.exception))
+ def test_blob_end_to_end_with_descriptor(self):
+ # Set up file I/O
+ file_io = FileIO(self.temp_dir, {})
+
+ # ========== Step 1: Write data to local file ==========
+ # Create test data and write it to a local file
+ test_content = b'This is test blob content stored in an external file
for descriptor testing.'
+ # Write the test content to a local file
+ local_data_file = Path(self.temp_dir) / "external_blob"
+ with open(local_data_file, 'wb') as f:
+ f.write(test_content)
+ # Verify the file was created and has the correct content
+ self.assertTrue(local_data_file.exists())
+ with open(local_data_file, 'rb') as f:
+ written_content = f.read()
+ self.assertEqual(written_content, test_content)
+
+ # ========== Step 2: Use this file as blob descriptor ==========
+ # Create a BlobDescriptor pointing to the local file
+ blob_descriptor = BlobDescriptor(
+ uri=str(local_data_file),
+ offset=0,
+ length=len(test_content)
+ )
+ # Serialize the descriptor to bytes (this is what would be stored in
the blob column)
+ descriptor_bytes = blob_descriptor.serialize()
+ self.assertIsInstance(descriptor_bytes, bytes)
+ self.assertGreater(len(descriptor_bytes), 0)
+
+ # Create PyArrow table with the serialized descriptor
+ blob_field_name = "blob_descriptor_field"
+ schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+ table = pa.table([[descriptor_bytes]], schema=schema)
+
+ # Write the blob file with blob_as_descriptor=True
+ blob_file_path = Path(self.temp_dir) / "descriptor_blob.blob"
+ file_io.write_blob(blob_file_path, table, blob_as_descriptor=True)
+ # Verify the blob file was created
+ self.assertTrue(file_io.exists(blob_file_path))
+ file_size = file_io.get_file_size(blob_file_path)
+ self.assertGreater(file_size, 0)
+
+ # ========== Step 3: Read data and check ==========
+ # Define schema for reading
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(blob_file_path),
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=True
+ )
+
+ # Read the data with blob_as_descriptor=True (should return a
descriptor)
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.num_rows, 1)
+ self.assertEqual(batch.num_columns, 1)
+
+ read_blob_bytes = batch.column(0)[0].as_py()
+ self.assertIsInstance(read_blob_bytes, bytes)
+
+ # Deserialize the returned descriptor
+ returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes)
+
+ # The returned descriptor should point to the blob file (simplified
implementation)
+ # because the current implementation creates a descriptor pointing to
the blob file location
+ self.assertEqual(returned_descriptor.uri, str(blob_file_path))
+ self.assertGreater(returned_descriptor.offset, 0) # Should have some
offset in the blob file
+
+ reader.close()
+
+ reader_content = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(blob_file_path),
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+ batch_content = reader_content.read_arrow_batch()
+ self.assertIsNotNone(batch_content)
+ self.assertEqual(batch_content.num_rows, 1)
+ read_content_bytes = batch_content.column(0)[0].as_py()
+ self.assertIsInstance(read_content_bytes, bytes)
+ # When blob_as_descriptor=False, we should get the actual file content
+ self.assertEqual(read_content_bytes, test_content)
+ reader_content.close()
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/uri_reader_factory_test.py
b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
new file mode 100644
index 0000000000..12088d746c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
@@ -0,0 +1,227 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+from pathlib import Path
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.uri_reader import UriReaderFactory, HttpUriReader,
FileUriReader, UriReader
+
+
+class MockFileIO:
+ """Mock FileIO for testing."""
+
+ def __init__(self, file_io: FileIO):
+ self._file_io = file_io
+
+ def get_file_size(self, path: str) -> int:
+ """Get file size."""
+ return self._file_io.get_file_size(Path(path))
+
+ def new_input_stream(self, path: Path):
+ """Create new input stream for reading."""
+ return self._file_io.new_input_stream(path)
+
+
+class UriReaderFactoryTest(unittest.TestCase):
+
+ def setUp(self):
+ self.factory = UriReaderFactory({})
+ self.temp_dir = tempfile.mkdtemp()
+ self.temp_file = os.path.join(self.temp_dir, "test.txt")
+ with open(self.temp_file, 'w') as f:
+ f.write("test content")
+
+ def tearDown(self):
+ """Clean up temporary files."""
+ try:
+ if os.path.exists(self.temp_file):
+ os.remove(self.temp_file)
+ os.rmdir(self.temp_dir)
+ except OSError:
+ pass # Ignore cleanup errors
+
+ def test_create_http_uri_reader(self):
+ """Test creating HTTP URI reader."""
+ reader = self.factory.create("http://example.com/file.txt")
+ self.assertIsInstance(reader, HttpUriReader)
+
+ def test_create_https_uri_reader(self):
+ """Test creating HTTPS URI reader."""
+ reader = self.factory.create("https://example.com/file.txt")
+ self.assertIsInstance(reader, HttpUriReader)
+
+ def test_create_file_uri_reader(self):
+ """Test creating file URI reader."""
+ reader = self.factory.create(f"file://{self.temp_file}")
+ self.assertIsInstance(reader, FileUriReader)
+
+ def test_create_uri_reader_with_authority(self):
+ """Test creating URI readers with different authorities."""
+ reader1 = self.factory.create("http://my_bucket1/path/to/file.txt")
+ reader2 = self.factory.create("http://my_bucket2/path/to/file.txt")
+
+ # Different authorities should create different readers
+ self.assertNotEqual(reader1, reader2)
+ self.assertIsNot(reader1, reader2)
+
+ def test_cached_readers_with_same_scheme_and_authority(self):
+ """Test that readers with same scheme and authority are cached."""
+ reader1 = self.factory.create("http://my_bucket/path/to/file1.txt")
+ reader2 = self.factory.create("http://my_bucket/path/to/file2.txt")
+
+ # Same scheme and authority should return the same cached reader
+ self.assertIs(reader1, reader2)
+
+ def test_cached_readers_with_null_authority(self):
+ """Test that readers with null authority are cached."""
+ reader1 = self.factory.create(f"file://{self.temp_file}")
+ reader2 =
self.factory.create(f"file://{self.temp_dir}/another_file.txt")
+
+ # Same scheme with null authority should return the same cached reader
+ self.assertIs(reader1, reader2)
+
+ def test_create_uri_reader_with_local_path(self):
+ """Test creating URI reader with local path (no scheme)."""
+ reader = self.factory.create(self.temp_file)
+ self.assertIsInstance(reader, FileUriReader)
+
+ def test_cache_size_tracking(self):
+ """Test that cache size is tracked correctly."""
+ initial_size = self.factory.get_cache_size()
+
+ # Create readers with different schemes/authorities
+ self.factory.create("http://example.com/file.txt")
+ self.assertEqual(self.factory.get_cache_size(), initial_size + 1)
+
+ self.factory.create("https://example.com/file.txt")
+ self.assertEqual(self.factory.get_cache_size(), initial_size + 2)
+
+ self.factory.create(f"file://{self.temp_file}")
+ self.assertEqual(self.factory.get_cache_size(), initial_size + 3)
+
+ # Same scheme/authority should not increase cache size
+ self.factory.create("http://example.com/another_file.txt")
+ self.assertEqual(self.factory.get_cache_size(), initial_size + 3)
+
+ def test_uri_reader_functionality(self):
+ """Test that created URI readers actually work."""
+ # Test file URI reader
+ reader = self.factory.create(f"file://{self.temp_file}")
+ stream = reader.new_input_stream(self.temp_file)
+ content = stream.read().decode('utf-8')
+ self.assertEqual(content, "test content")
+ stream.close()
+
+ def test_invalid_uri_handling(self):
+ """Test handling of invalid URIs."""
+ # This should not raise an exception as urlparse is quite permissive
+ # But we can test edge cases
+ reader = self.factory.create("")
+ self.assertIsInstance(reader, (HttpUriReader, FileUriReader))
+
+ def test_uri_key_equality(self):
+ """Test UriKey equality and hashing behavior."""
+ from pypaimon.common.uri_reader import UriKey
+
+ key1 = UriKey("http", "example.com")
+ key2 = UriKey("http", "example.com")
+ key3 = UriKey("https", "example.com")
+ key4 = UriKey("http", "other.com")
+
+ # Same scheme and authority should be equal
+ self.assertEqual(key1, key2)
+ self.assertEqual(hash(key1), hash(key2))
+
+ # Different scheme or authority should not be equal
+ self.assertNotEqual(key1, key3)
+ self.assertNotEqual(key1, key4)
+
+ # Test with None values
+ key_none1 = UriKey(None, None)
+ key_none2 = UriKey(None, None)
+ self.assertEqual(key_none1, key_none2)
+
+ def test_uri_key_string_representation(self):
+ """Test UriKey string representation."""
+ from pypaimon.common.uri_reader import UriKey
+
+ key = UriKey("http", "example.com")
+ repr_str = repr(key)
+ self.assertIn("http", repr_str)
+ self.assertIn("example.com", repr_str)
+
+ def test_thread_safety_simulation(self):
+ """Test thread safety by creating multiple readers concurrently."""
+ import threading
+ import time
+
+ results = []
+
+ def create_reader():
+ reader = self.factory.create("http://example.com/file.txt")
+ results.append(reader)
+ time.sleep(0.01) # Small delay to increase chance of race
conditions
+
+ # Create multiple threads
+ threads = []
+ for _ in range(10):
+ thread = threading.Thread(target=create_reader)
+ threads.append(thread)
+ thread.start()
+
+ # Wait for all threads to complete
+ for thread in threads:
+ thread.join()
+
+ # All results should be the same cached reader
+ first_reader = results[0]
+ for reader in results[1:]:
+ self.assertIs(reader, first_reader)
+
+ def test_different_file_schemes(self):
+ """Test different file-based schemes."""
+ # Test absolute path without scheme
+ reader1 = self.factory.create(os.path.abspath(self.temp_file))
+ self.assertIsInstance(reader1, FileUriReader)
+
+ # Test file:// scheme
+ reader2 = self.factory.create(f"file://{self.temp_file}")
+ self.assertIsInstance(reader2, FileUriReader)
+
+ # Different schemes (empty vs "file") should create different cache
entries
+ self.assertIsNot(reader1, reader2)
+
+ # But same scheme should be cached
+ reader3 =
self.factory.create(f"file://{self.temp_dir}/another_file.txt")
+ self.assertIs(reader2, reader3) # Same file:// scheme
+
+ def test_get_file_path_with_file_uri(self):
+ file_uri = f"file://{self.temp_file}"
+ path = UriReader.get_file_path(file_uri)
+ self.assertEqual(str(path), self.temp_file)
+ oss_file_path = "bucket/tmp/another_file.txt"
+ file_uri = f"oss://{oss_file_path}"
+ path = UriReader.get_file_path(file_uri)
+ self.assertEqual(str(path), oss_file_path)
+ path = UriReader.get_file_path(self.temp_file)
+ self.assertEqual(str(path), self.temp_file)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index ad6e327c89..0a063fb2f4 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -58,6 +58,7 @@ class DataWriter(ABC):
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
self.write_cols = write_cols
+ self.blob_as_descriptor =
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
def write(self, data: pa.RecordBatch):
processed_data = self._process_data(data)
@@ -115,7 +116,7 @@ class DataWriter(ABC):
elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
self.file_io.write_avro(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
- self.file_io.write_blob(file_path, data)
+ self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")