dontirun commented on code in PR #674:
URL: https://github.com/apache/iceberg-go/pull/674#discussion_r2677556015


##########
table/transaction.go:
##########
@@ -474,6 +474,278 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
        return t.apply(updates, reqs)
 }
 
+// OverwriteTable overwrites the table data using an Arrow Table, optionally 
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, 
snapshotProps iceberg.Properties) error {
+       rdr := array.NewTableReader(tbl, batchSize)
+       defer rdr.Release()
+
+       return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a 
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, 
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps 
iceberg.Properties) error {
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       commitUUID := uuid.New()
+       updater := t.updateSnapshot(fs, 
snapshotProps).mergeOverwrite(&commitUUID)
+
+       filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, 
fs, filter)
+       if err != nil {
+               return err
+       }
+
+       for _, df := range filesToDelete {
+               updater.deleteDataFile(df)
+       }
+
+       if len(filesToRewrite) > 0 {
+               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, filter); err != nil {
+                       return err
+               }
+       }
+
+       itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        rdr.Schema(),
+               itr:       array.IterFromReader(rdr),
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &updater.commitUuid,
+       })
+
+       for df, err := range itr {
+               if err != nil {
+                       return err
+               }
+               updater.appendDataFile(df)
+       }
+
+       updates, reqs, err := updater.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the 
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any 
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite 
[]iceberg.DataFile, err error) {
+       s := t.meta.currentSnapshot()
+       if s == nil {
+               return nil, nil, nil
+       }
+
+       if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+               for df, err := range s.dataFiles(fs, nil) {
+                       if err != nil {
+                               return nil, nil, err
+                       }
+                       if df.ContentType() == iceberg.EntryContentData {
+                               filesToDelete = append(filesToDelete, df)
+                       }
+               }
+               return filesToDelete, filesToRewrite, nil
+       }
+       return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite 
operations.
+// Returns files to delete completely, files to rewrite partially, and any 
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite 
[]iceberg.DataFile, err error) {
+       schema := t.meta.CurrentSchema()
+
+       inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
true, false)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create inclusive metrics 
evaluator: %w", err)
+       }
+
+       strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true, 
false)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create strict metrics 
evaluator: %w", err)
+       }
+
+       var manifestEval func(iceberg.ManifestFile) (bool, error)
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+       }
+       spec := meta.PartitionSpec()
+       if !spec.IsUnpartitioned() {
+               manifestEval, err = newManifestEvaluator(spec, schema, filter, 
true)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed to create manifest 
evaluator: %w", err)
+               }
+       }
+
+       s := t.meta.currentSnapshot()
+       manifests, err := s.Manifests(fs)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+       }
+
+       for _, manifest := range manifests {
+               if manifestEval != nil {
+                       match, err := manifestEval(manifest)
+                       if err != nil {
+                               return nil, nil, fmt.Errorf("failed to evaluate 
manifest: %w", err)
+                       }
+                       if !match {
+                               continue
+                       }
+               }
+
+               entries, err := manifest.FetchEntries(fs, false)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed to fetch manifest 
entries: %w", err)
+               }
+
+               for _, entry := range entries {
+                       if entry.Status() == iceberg.EntryStatusDELETED {
+                               continue
+                       }
+
+                       df := entry.DataFile()
+                       if df.ContentType() != iceberg.EntryContentData {
+                               continue
+                       }
+
+                       inclusive, err := inclusiveEvaluator(df)
+                       if err != nil {
+                               return nil, nil, fmt.Errorf("failed to evaluate 
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+                       }
+
+                       if !inclusive {
+                               continue
+                       }
+
+                       strict, err := strictEvaluator(df)
+                       if err != nil {
+                               return nil, nil, fmt.Errorf("failed to evaluate 
data file %s with strict evaluator: %w", df.FilePath(), err)
+                       }
+
+                       if strict {
+                               filesToDelete = append(filesToDelete, df)
+                       } else {
+                               filesToRewrite = append(filesToRewrite, df)
+                       }
+               }
+       }
+
+       return filesToDelete, filesToRewrite, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression) error {
+       complementFilter := iceberg.NewNot(filter)
+
+       for _, originalFile := range files {
+               rewrittenFiles, err := t.rewriteSingleFile(ctx, fs, 
originalFile, complementFilter, updater.commitUuid)
+               if err != nil {
+                       return fmt.Errorf("failed to rewrite file %s: %w", 
originalFile.FilePath(), err)
+               }
+
+               updater.deleteDataFile(originalFile)
+               for _, rewrittenFile := range rewrittenFiles {
+                       updater.appendDataFile(rewrittenFile)
+               }
+       }
+
+       return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID 
uuid.UUID) ([]iceberg.DataFile, error) {
+       scanTask := &FileScanTask{
+               File:   originalFile,
+               Start:  0,
+               Length: originalFile.FileSizeBytes(),
+       }
+
+       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
true)
+       if err != nil {
+               return nil, fmt.Errorf("failed to bind filter: %w", err)
+       }
+
+       meta, err := t.meta.Build()
+       if err != nil {
+               return nil, fmt.Errorf("failed to build metadata: %w", err)
+       }
+
+       scanner := &arrowScan{
+               metadata:        meta,
+               fs:              fs,
+               projectedSchema: t.meta.CurrentSchema(),
+               boundRowFilter:  boundFilter,
+               caseSensitive:   true,
+               rowLimit:        -1, // No limit
+               concurrency:     1,
+       }

Review Comment:
   I don't know if this is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to