arnaudbriche commented on code in PR #354:
URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007259597
##########
table/transaction.go:
##########
@@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader,
snapshotProps iceberg.Prope
return iceberg.ErrNotImplemented
}
+type MergeOp struct {
+ OutputFile string
+ InputFiles []string
+}
+
+func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps
iceberg.Properties) error {
+ var (
+ inputFiles = make(map[string]bool)
+ outputFiles = make(map[string]struct{})
+ numInputFiles int
+ numOutputFiles int
+ )
+
+ for _, op := range ops {
+ if len(op.InputFiles) <= 1 {
+ return fmt.Errorf("merge operation must have at least 2
input files (%d)", len(op.InputFiles))
+ }
+
+ outputFiles[op.OutputFile] = struct{}{}
+ numOutputFiles++
+
+ for _, f := range op.InputFiles {
+ inputFiles[f] = false
+ numInputFiles++
+ }
+ }
+
+ if len(outputFiles) != numOutputFiles {
+ return errors.New("duplicate output files")
+ }
+
+ if len(inputFiles) != numInputFiles {
+ return errors.New("duplicate input files")
+ }
+
+ s := t.meta.currentSnapshot()
+
+ if s == nil {
+ return errors.New("merge operation requires an existing
snapshot")
+ }
+
+ manifestFiles, err := s.Manifests(t.tbl.fs)
+ if err != nil {
+ return err
+ }
+
+ var (
+ existingManifestFiles []iceberg.ManifestFile // manifest that
don't contain any input files
+ existingDataFiles []iceberg.DataFile // existing data
file entries in manifest that contain some input files
+ deletedManifestEntries []iceberg.ManifestEntry
+ deletedManifestFiles []iceberg.ManifestFile
+ )
+
+ for _, manifestFile := range manifestFiles {
+ entries, err := manifestFile.FetchEntries(t.tbl.fs, false)
+ if err != nil {
+ return err
+ }
+
+ var (
+ isManifestFileTouched bool
+ untouchedDataFiles []iceberg.DataFile
+ )
+
+ for _, entry := range entries {
+ entry.Status()
Review Comment:
Weird indeed.
##########
table/snapshot_producers.go:
##########
@@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries()
([]iceberg.ManifestEntry, error) {
return nil, nil
}
+func newMergeFilesProducer(
+ op Operation,
+ txn *Transaction,
+ fs iceio.WriteFileIO,
+ commitUUID *uuid.UUID,
+ snapshotProps iceberg.Properties,
+ existingManifestFiles []iceberg.ManifestFile,
+ deletedManifestFiles []iceberg.ManifestFile,
+ deletedManifestEntries []iceberg.ManifestEntry,
+) *snapshotProducer {
+ prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+ prod.producerImpl = &mergeFiles{
+ base: prod,
+ existingManifestFiles: existingManifestFiles,
+ deletedManifestFiles: deletedManifestFiles,
+ deletedManifestEntries: deletedManifestEntries,
+ }
+
+ return prod
+}
+
+type mergeFiles struct {
+ base *snapshotProducer
+
+ existingManifestFiles []iceberg.ManifestFile
+ deletedManifestFiles []iceberg.ManifestFile
+ deletedManifestEntries []iceberg.ManifestEntry
+}
+
+func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile)
([]iceberg.ManifestFile, error) {
+ var manifestsToKeep []iceberg.ManifestFile
+
+ for _, manifest := range manifests {
+ var found bool
+
+ for _, deletedManifest := range mf.deletedManifestFiles {
+ if manifest.FilePath() == deletedManifest.FilePath() {
+ found = true
+
+ break
+ }
+ }
+
+ if !found {
+ manifestsToKeep = append(manifestsToKeep, manifest)
+ }
+ }
Review Comment:
Ok, I'm a bit confused now; I was pretty sure `ManifestContentDeletes`
refers to parquet files containing deleted row masks, not a manifest containing
a list of deletes files.
I hacked this together without really understanding the way `producerImpl`
interface is meant to be used, and I think it would be really beneficial to get
some intel from you.
Here is how I understood it:
```go
type producerImpl interface {
// called with the full list of manifest for the snapshot at the end
of the process
// it is here that I am supposed to get rid of the manifest files
containing some merge input files ?
processManifests(manifests []iceberg.ManifestFile)
([]iceberg.ManifestFile, error)
// why would it differs by implementation of `producerImpl` ?
// isn't it supposed to be the list of manifest for the parent
snapshot every time ?
existingManifests() ([]iceberg.ManifestFile, error)
// used to maintain table statistics but not to remove manifest
containing those entries ?
deletedEntries() ([]iceberg.ManifestEntry, error)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]