HonahX commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1435031878
########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id Review Comment: Instead of passing `parent_snapshot_id` as an argument, I wonder if we could replace it by a `branch_name: str = "main"` : ```python def __init__(self, operation: Operation, table: Table, snapshot_id: int, branch_name: str = "main") -> None: ... self._parent_snapshot = self._table.current_snapshot() # only support main in the first version self._parent_snapshot_id = self._parent_snapshot.snapshot_id if self._parent_snapshot else None ``` In this way we only do `snapshot_by_id` once and can avoid duplicate code and None check in other places. When adding branch support later, we could replace `self._table.current_snapshot()` by something similar to https://github.com/apache/iceberg/blob/a4d47567e1fef44f4443250537f09dc73a1f7583/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L493-L503 Does this sound reasonable to you? May be this can be discussed later in the PR for branch support. ########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id + self._added_datafiles = [] + self._existing_datafiles = [] + self._commit_uuid = uuid.uuid4() + + def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: + if added: + self._added_datafiles.append(data_file) + else: + self._existing_datafiles.append(data_file) + return self + + def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile: Review Comment: Looks like this is not used. Out of curiosity, will there be any use case of this in the future? I think we choose to append existing data files instead of copying existing manifest files ########## pyiceberg/io/pyarrow.py: ########## @@ -1447,18 +1452,15 @@ def parquet_path_to_id_mapping( def fill_parquet_file_metadata( - df: DataFile, + datafile: DataFile, Review Comment: ```suggestion data_file: DataFile, ``` Shall we rename this to `data_file`? ########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id + self._added_datafiles = [] + self._existing_datafiles = [] + self._commit_uuid = uuid.uuid4() + + def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: + if added: + self._added_datafiles.append(data_file) + else: + self._existing_datafiles.append(data_file) + return self + + def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + """Rewrites a manifest file with a new snapshot-id. + + Args: + manifest_file: The existing manifest file + + Returns: + New manifest file with the current snapshot-id + """ + output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) + with write_manifest( + format_version=self._table.format_version, + spec=self._table.specs()[manifest_file.partition_spec_id], + schema=self._table.schema(), + output_file=self._table.io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for entry in manifest_file.fetch_manifest_entry(self._table.io, discard_deleted=True): + writer.add_entry(entry) + + return writer.to_manifest_file() + + def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: + ssc = SnapshotSummaryCollector() + manifests = [] + + if self._added_datafiles: + output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) + with write_manifest( + format_version=self._table.format_version, + spec=self._table.spec(), + schema=self._table.schema(), + output_file=self._table.io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for data_file in self._added_datafiles + self._existing_datafiles: + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + data_sequence_number=None, + file_sequence_number=None, + data_file=data_file, + ) + ) + + for data_file in self._added_datafiles: + ssc.add_file(data_file=data_file) + + manifests.append(writer.to_manifest_file()) + + return ssc.build(), manifests + + def commit(self) -> Snapshot: + new_summary, manifests = self._manifests() + + previous_summary = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None Review Comment: ```suggestion previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None ``` Although this is only for extracting the previous summary, I think it might be good to use `_snapshot` as its name to be clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org