Yicong-Huang commented on code in PR #4100:
URL: https://github.com/apache/texera/pull/4100#discussion_r2594449572


##########
amber/src/main/python/core/models/schema/large_binary.py:
##########
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+largebinary represents a reference to a large object stored externally (e.g., 
S3).
+This is a schema type class used throughout the system for handling
+BIG_OBJECT attribute types.
+"""
+
+from typing import Optional
+from urllib.parse import urlparse
+
+
+class largebinary:

Review Comment:
   Let's use `LargeBinary` for class name.



##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Internal largebinary manager for S3 operations.
+
+Users should not interact with this module directly. Use largebinary() 
constructor
+and LargeBinaryInputStream/LargeBinaryOutputStream instead.
+"""
+
+import logging
+import time
+import uuid
+from core.storage.storage_config import StorageConfig
+
+logger = logging.getLogger(__name__)
+
+
+class LargeBinaryManager:

Review Comment:
   is this a singleton? if so please follow singleton pattern by adding 
`__new__(cls)` guard to ensure there is only one copy of the class instance.



##########
amber/src/main/python/core/models/schema/large_binary.py:
##########


Review Comment:
   can you move this to `core/models/type/`? I think that's a better location. 



##########
amber/src/main/python/core/models/schema/arrow_schema_utils.py:
##########
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Utilities for converting between Arrow schemas and Amber schemas,
+handling LARGE_BINARY metadata preservation.
+"""
+
+import pyarrow as pa
+from typing import Mapping
+
+from core.models.schema.attribute_type import (
+    AttributeType,
+    FROM_ARROW_MAPPING,
+    TO_ARROW_MAPPING,
+)
+
+# Metadata key used to mark LARGE_BINARY fields in Arrow schemas
+TEXERA_TYPE_METADATA_KEY = b"texera_type"
+LARGE_BINARY_METADATA_VALUE = b"LARGE_BINARY"
+
+
+def detect_attribute_type_from_arrow_field(field: pa.Field) -> AttributeType:
+    """
+    Detects the AttributeType from an Arrow field, checking metadata for 
LARGE_BINARY.
+
+    :param field: PyArrow field that may contain metadata
+    :return: The detected AttributeType
+    """
+    # Check metadata for LARGE_BINARY type
+    # (can be stored by either Scala ArrowUtils or Python)
+    is_large_binary = (
+        field.metadata
+        and field.metadata.get(TEXERA_TYPE_METADATA_KEY) == 
LARGE_BINARY_METADATA_VALUE
+    )
+
+    if is_large_binary:
+        return AttributeType.LARGE_BINARY
+    else:
+        return FROM_ARROW_MAPPING[field.type.id]
+
+
+def create_arrow_field_with_metadata(
+    attr_name: str, attr_type: AttributeType
+) -> pa.Field:
+    """
+    Creates a PyArrow field with appropriate metadata for the given 
AttributeType.
+
+    :param attr_name: Name of the attribute
+    :param attr_type: The AttributeType
+    :return: PyArrow field with metadata if needed
+    """
+    metadata = (
+        {TEXERA_TYPE_METADATA_KEY: LARGE_BINARY_METADATA_VALUE}
+        if attr_type == AttributeType.LARGE_BINARY
+        else None
+    )
+
+    return pa.field(attr_name, TO_ARROW_MAPPING[attr_type], metadata=metadata)

Review Comment:
   you see this method only involves `AttributeType` but not `Schema`. so it's 
better to be moved elsewhere. I recall we have a type utils. can you check and 
move those methods there?



##########
amber/src/main/python/pytexera/storage/large_binary_input_stream.py:
##########
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+LargeBinaryInputStream for reading largebinary data from S3.
+
+Usage:
+    with LargeBinaryInputStream(large_binary) as stream:
+        content = stream.read()
+"""
+
+from typing import BinaryIO, Optional
+from functools import wraps
+from io import IOBase
+from core.models.schema.large_binary import largebinary
+
+
+def _require_open(func):
+    """Decorator to ensure stream is open before reading operations."""
+
+    @wraps(func)
+    def wrapper(self, *args, **kwargs):
+        if self._closed:
+            raise ValueError("I/O operation on closed stream")
+        if self._underlying is None:
+            self._lazy_init()
+        return func(self, *args, **kwargs)
+
+    return wrapper
+
+
+class LargeBinaryInputStream(IOBase):
+    """
+    InputStream for reading largebinary data from S3.
+
+    Lazily downloads from S3 on first read. Supports context manager and 
iteration.
+    """
+
+    def __init__(self, large_binary: largebinary):
+        """Initialize stream for reading the given largebinary."""
+        super().__init__()
+        if large_binary is None:
+            raise ValueError("largebinary cannot be None")
+        self._large_binary = large_binary
+        self._underlying: Optional[BinaryIO] = None
+        self._closed = False
+
+    def _lazy_init(self):
+        """Download from S3 on first read operation."""
+        from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+        s3 = LargeBinaryManager._get_s3_client()
+        response = s3.get_object(
+            Bucket=self._large_binary.get_bucket_name(),
+            Key=self._large_binary.get_object_key(),
+        )
+        self._underlying = response["Body"]
+
+    @_require_open
+    def read(self, n: int = -1) -> bytes:
+        """Read and return up to n bytes (-1 reads all)."""
+        return self._underlying.read(n)
+
+    @_require_open
+    def readline(self, size: int = -1) -> bytes:
+        """Read and return one line from the stream."""
+        return self._underlying.readline(size)
+
+    @_require_open
+    def readlines(self, hint: int = -1) -> list[bytes]:
+        """Read and return a list of lines from the stream."""
+        return self._underlying.readlines(hint)
+
+    def readable(self) -> bool:
+        """Return True if the stream can be read from."""
+        return not self._closed
+
+    def seekable(self) -> bool:
+        """Return False - this stream does not support seeking."""
+        return False
+
+    @property
+    def closed(self) -> bool:
+        """Return True if the stream is closed."""
+        return self._closed
+
+    def close(self) -> None:
+        """Close the stream and release resources."""
+        if not self._closed:
+            self._closed = True
+            if self._underlying is not None:
+                self._underlying.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()

Review Comment:
   great that we are using a context manager. 



##########
amber/src/main/python/core/models/schema/attribute_type.py:
##########
@@ -83,6 +87,7 @@ class AttributeType(Enum):
     AttributeType.BOOL: bool,
     AttributeType.BINARY: bytes,
     AttributeType.TIMESTAMP: datetime.datetime,
+    AttributeType.LARGE_BINARY: largebinary,

Review Comment:
   i think we should map it to arrow string type. if map to `largebinary` type, 
then arrow will do optimizations, but our actual data is only the pointer, 
which means the optimization maybe a backfire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to