syun64 commented on code in PR #363:
URL: https://github.com/apache/iceberg-python/pull/363#discussion_r1626456000
##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema:
Schema, arrow_table: pa.T
table_partitions: List[TablePartition] =
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
return table_partitions
+
+
+class _ManifestMergeManager:
+ _target_size_bytes: int
+ _min_count_to_merge: int
+ _merge_enabled: bool
+ _snapshot_producer: _MergingSnapshotProducer
+
+ def __init__(
+ self, target_size_bytes: int, min_count_to_merge: int, merge_enabled:
bool, snapshot_producer: _MergingSnapshotProducer
+ ) -> None:
+ self._target_size_bytes = target_size_bytes
+ self._min_count_to_merge = min_count_to_merge
+ self._merge_enabled = merge_enabled
+ self._snapshot_producer = snapshot_producer
+
+ def _group_by_spec(
+ self, first_manifest: ManifestFile, remaining_manifests:
List[ManifestFile]
+ ) -> Dict[int, List[ManifestFile]]:
+ groups = defaultdict(list)
+ groups[first_manifest.partition_spec_id].append(first_manifest)
+ for manifest in remaining_manifests:
+ groups[manifest.partition_spec_id].append(manifest)
+ return groups
+
+ def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile])
-> ManifestFile:
+ with
self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id))
as writer:
+ for manifest in manifest_bin:
+ for entry in
self._snapshot_producer.fetch_manifest_entry(manifest=manifest,
discard_deleted=False):
+ if entry.status == ManifestEntryStatus.DELETED:
+ # suppress deletes from previous snapshots. only
files deleted by this snapshot
+ # should be added to the new manifest
+ if entry.snapshot_id ==
self._snapshot_producer.snapshot_id:
+ writer.delete(entry)
+ elif entry.status == ManifestEntryStatus.ADDED and
entry.snapshot_id == self._snapshot_producer.snapshot_id:
+ # adds from this snapshot are still adds, otherwise
they should be existing
+ writer.add(entry)
+ else:
+ # add all files from the old manifest as existing files
+ writer.existing(entry)
+
+ return writer.to_manifest_file()
+
+ def _merge_group(self, first_manifest: ManifestFile, spec_id: int,
manifests: List[ManifestFile]) -> List[ManifestFile]:
+ packer: ListPacker[ManifestFile] =
ListPacker(target_weight=self._target_size_bytes, lookback=1,
largest_bin_first=False)
+ bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m:
m.manifest_length)
+
+ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
+ output_manifests = []
+ if len(manifest_bin) == 1:
+ output_manifests.append(manifest_bin[0])
+ elif first_manifest in manifest_bin and len(manifest_bin) <
self._min_count_to_merge:
+ # if the bin has the first manifest (the new data files or an
appended manifest file)
+ # then only merge it
Review Comment:
nit: clean up of the comment - lining may be good here
##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema:
Schema, arrow_table: pa.T
table_partitions: List[TablePartition] =
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
return table_partitions
+
+
+class _ManifestMergeManager:
+ _target_size_bytes: int
+ _min_count_to_merge: int
+ _merge_enabled: bool
+ _snapshot_producer: _MergingSnapshotProducer
+
+ def __init__(
+ self, target_size_bytes: int, min_count_to_merge: int, merge_enabled:
bool, snapshot_producer: _MergingSnapshotProducer
+ ) -> None:
+ self._target_size_bytes = target_size_bytes
+ self._min_count_to_merge = min_count_to_merge
+ self._merge_enabled = merge_enabled
+ self._snapshot_producer = snapshot_producer
+
+ def _group_by_spec(
+ self, first_manifest: ManifestFile, remaining_manifests:
List[ManifestFile]
+ ) -> Dict[int, List[ManifestFile]]:
+ groups = defaultdict(list)
+ groups[first_manifest.partition_spec_id].append(first_manifest)
+ for manifest in remaining_manifests:
+ groups[manifest.partition_spec_id].append(manifest)
+ return groups
+
+ def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile])
-> ManifestFile:
+ with
self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id))
as writer:
+ for manifest in manifest_bin:
+ for entry in
self._snapshot_producer.fetch_manifest_entry(manifest=manifest,
discard_deleted=False):
+ if entry.status == ManifestEntryStatus.DELETED:
+ # suppress deletes from previous snapshots. only
files deleted by this snapshot
+ # should be added to the new manifest
+ if entry.snapshot_id ==
self._snapshot_producer.snapshot_id:
Review Comment:
nit: I think we could condense this into the outer condition, consistent to
how you did in the line below
##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema:
Schema, arrow_table: pa.T
table_partitions: List[TablePartition] =
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
return table_partitions
+
+
+class _ManifestMergeManager:
+ _target_size_bytes: int
+ _min_count_to_merge: int
+ _merge_enabled: bool
+ _snapshot_producer: _MergingSnapshotProducer
+
+ def __init__(
+ self, target_size_bytes: int, min_count_to_merge: int, merge_enabled:
bool, snapshot_producer: _MergingSnapshotProducer
+ ) -> None:
+ self._target_size_bytes = target_size_bytes
+ self._min_count_to_merge = min_count_to_merge
+ self._merge_enabled = merge_enabled
+ self._snapshot_producer = snapshot_producer
+
+ def _group_by_spec(
+ self, first_manifest: ManifestFile, remaining_manifests:
List[ManifestFile]
Review Comment:
nit: would it be simpler to just have a single ordered list of manifests as
the argument for this function since we don't have a special handling for the
`first_manifest`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]