HonahX commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1435031878


##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
     snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
     return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+    write_uuid: uuid.UUID
+    task_id: int
+    df: pa.Table
+    sort_order_id: Optional[int] = None
+
+    # Later to be extended with partition information
+
+    def generate_datafile_filename(self, extension: str) -> str:
+        # Mimics the behavior in the Java API:
+        # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+    return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+    # Mimics the behavior in Java:
+    # 
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+    return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+    from pyiceberg.io.pyarrow import write_file
+
+    write_uuid = uuid.uuid4()
+    counter = itertools.count(0)
+
+    # This is an iter, so we don't have to materialize everything every time
+    # This will be more relevant when we start doing partitioned writes
+    yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+    _operation: Operation
+    _table: Table
+    _snapshot_id: int
+    _parent_snapshot_id: Optional[int]
+    _added_datafiles: List[DataFile]
+    _existing_datafiles: List[DataFile]
+    _commit_uuid: uuid.UUID
+
+    def __init__(self, operation: Operation, table: Table, snapshot_id: int, 
parent_snapshot_id: Optional[int]) -> None:
+        self._operation = operation
+        self._table = table
+        self._snapshot_id = snapshot_id
+        self._parent_snapshot_id = parent_snapshot_id

Review Comment:
   Instead of passing `parent_snapshot_id` as an argument, I wonder if we could 
replace it by a `branch_name: str = "main"` :
   ```python
   def __init__(self, operation: Operation, table: Table, snapshot_id: int, 
branch_name: str = "main") -> None:
     ...
     self._parent_snapshot = self._table.current_snapshot() # only support main 
in the first version
     self._parent_snapshot_id = self._parent_snapshot.snapshot_id if 
self._parent_snapshot else None
   ```
   In this way we only do `snapshot_by_id` once and can avoid duplicate code 
and None check in other places.
   
   When adding branch support later, we could replace 
`self._table.current_snapshot()` by something similar to 
   
https://github.com/apache/iceberg/blob/a4d47567e1fef44f4443250537f09dc73a1f7583/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L493-L503
   
   Does this sound reasonable to you? May be this can be discussed later in the 
PR for branch support.



##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
     snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
     return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+    write_uuid: uuid.UUID
+    task_id: int
+    df: pa.Table
+    sort_order_id: Optional[int] = None
+
+    # Later to be extended with partition information
+
+    def generate_datafile_filename(self, extension: str) -> str:
+        # Mimics the behavior in the Java API:
+        # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+    return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+    # Mimics the behavior in Java:
+    # 
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+    return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+    from pyiceberg.io.pyarrow import write_file
+
+    write_uuid = uuid.uuid4()
+    counter = itertools.count(0)
+
+    # This is an iter, so we don't have to materialize everything every time
+    # This will be more relevant when we start doing partitioned writes
+    yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+    _operation: Operation
+    _table: Table
+    _snapshot_id: int
+    _parent_snapshot_id: Optional[int]
+    _added_datafiles: List[DataFile]
+    _existing_datafiles: List[DataFile]
+    _commit_uuid: uuid.UUID
+
+    def __init__(self, operation: Operation, table: Table, snapshot_id: int, 
parent_snapshot_id: Optional[int]) -> None:
+        self._operation = operation
+        self._table = table
+        self._snapshot_id = snapshot_id
+        self._parent_snapshot_id = parent_snapshot_id
+        self._added_datafiles = []
+        self._existing_datafiles = []
+        self._commit_uuid = uuid.uuid4()
+
+    def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+        if added:
+            self._added_datafiles.append(data_file)
+        else:
+            self._existing_datafiles.append(data_file)
+        return self
+
+    def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile:

Review Comment:
   Looks like this is not used. Out of curiosity, will there be any use case of 
this in the future? I think we choose to append existing data files instead of 
copying existing manifest files



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1447,18 +1452,15 @@ def parquet_path_to_id_mapping(
 
 
 def fill_parquet_file_metadata(
-    df: DataFile,
+    datafile: DataFile,

Review Comment:
   ```suggestion
       data_file: DataFile,
   ```
   Shall we rename this to `data_file`?



##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
     snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
     return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+    write_uuid: uuid.UUID
+    task_id: int
+    df: pa.Table
+    sort_order_id: Optional[int] = None
+
+    # Later to be extended with partition information
+
+    def generate_datafile_filename(self, extension: str) -> str:
+        # Mimics the behavior in the Java API:
+        # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+    return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+    # Mimics the behavior in Java:
+    # 
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+    return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+    from pyiceberg.io.pyarrow import write_file
+
+    write_uuid = uuid.uuid4()
+    counter = itertools.count(0)
+
+    # This is an iter, so we don't have to materialize everything every time
+    # This will be more relevant when we start doing partitioned writes
+    yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+    _operation: Operation
+    _table: Table
+    _snapshot_id: int
+    _parent_snapshot_id: Optional[int]
+    _added_datafiles: List[DataFile]
+    _existing_datafiles: List[DataFile]
+    _commit_uuid: uuid.UUID
+
+    def __init__(self, operation: Operation, table: Table, snapshot_id: int, 
parent_snapshot_id: Optional[int]) -> None:
+        self._operation = operation
+        self._table = table
+        self._snapshot_id = snapshot_id
+        self._parent_snapshot_id = parent_snapshot_id
+        self._added_datafiles = []
+        self._existing_datafiles = []
+        self._commit_uuid = uuid.uuid4()
+
+    def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+        if added:
+            self._added_datafiles.append(data_file)
+        else:
+            self._existing_datafiles.append(data_file)
+        return self
+
+    def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+        """Rewrites a manifest file with a new snapshot-id.
+
+        Args:
+            manifest_file: The existing manifest file
+
+        Returns:
+            New manifest file with the current snapshot-id
+        """
+        output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+        with write_manifest(
+            format_version=self._table.format_version,
+            spec=self._table.specs()[manifest_file.partition_spec_id],
+            schema=self._table.schema(),
+            output_file=self._table.io.new_output(output_file_location),
+            snapshot_id=self._snapshot_id,
+        ) as writer:
+            for entry in manifest_file.fetch_manifest_entry(self._table.io, 
discard_deleted=True):
+                writer.add_entry(entry)
+
+        return writer.to_manifest_file()
+
+    def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+        ssc = SnapshotSummaryCollector()
+        manifests = []
+
+        if self._added_datafiles:
+            output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+            with write_manifest(
+                format_version=self._table.format_version,
+                spec=self._table.spec(),
+                schema=self._table.schema(),
+                output_file=self._table.io.new_output(output_file_location),
+                snapshot_id=self._snapshot_id,
+            ) as writer:
+                for data_file in self._added_datafiles + 
self._existing_datafiles:
+                    writer.add_entry(
+                        ManifestEntry(
+                            status=ManifestEntryStatus.ADDED,
+                            snapshot_id=self._snapshot_id,
+                            data_sequence_number=None,
+                            file_sequence_number=None,
+                            data_file=data_file,
+                        )
+                    )
+
+                for data_file in self._added_datafiles:
+                    ssc.add_file(data_file=data_file)
+
+            manifests.append(writer.to_manifest_file())
+
+        return ssc.build(), manifests
+
+    def commit(self) -> Snapshot:
+        new_summary, manifests = self._manifests()
+
+        previous_summary = 
self._table.snapshot_by_id(self._parent_snapshot_id) if 
self._parent_snapshot_id is not None else None

Review Comment:
   ```suggestion
           previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id) if 
self._parent_snapshot_id is not None else None
   ```
   Although this is only for extracting the previous summary, I think it might 
be good to use `_snapshot` as its name to be clear
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to