alexandre-normand commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2850530628
##########
table/arrow_utils.go:
##########
@@ -1358,12 +1370,87 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
}
}
- return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
- } else {
- rollingDataWriters := NewWriterFactory(rootLocation, args,
meta, taskSchema, targetFileSize)
- partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr, &rollingDataWriters)
- workers := config.EnvConfig.MaxWorkers
+ return cw.writeFiles(ctx, rootLocation, args.fs, meta,
meta.props, nil, tasks)
+ }
+
+ factory := NewWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize)
+ partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw,
meta.CurrentSchema(), args.itr, &factory)
+ workers := config.EnvConfig.MaxWorkers
- return partitionWriter.Write(ctx, workers)
+ return partitionWriter.Write(ctx, workers)
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation
string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any,
args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
+ if args.counter == nil {
+ args.counter = internal.Counter(0)
}
+
+ defer func() {
+ if r := recover(); r != nil {
+ var err error
+ switch e := r.(type) {
+ case error:
+ err = fmt.Errorf("error encountered during
position delete file writing: %w", e)
+ default:
+ err = fmt.Errorf("error encountered during
position delete file writing: %v", e)
+ }
+ ret = func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
+ }
+ }()
+
+ if args.writeUUID == nil {
+ u := uuid.Must(uuid.NewRandom())
+ args.writeUUID = &u
+ }
+
+ targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+ WriteTargetFileSizeBytesDefault))
+
+ currentSpec, err := meta.CurrentSpec()
Review Comment:
I tried to address this but let me know if that's correct. I extended the
mapping per data file to include the spec id in addition to the partition data
we were already tracking that way. That's passed down now and the position
delete fanout writer uses the partition spec from the partition context.
That's done in 83c50c6f542bf085b0d54b271d9f2720153efc55. The one case I was
unsure about is the one where the table's latest metadata is unpartitioned. In
that case, we were short-circuiting early and skipping the fanout writer. That
case remains but I'm not sure what the ramifications are.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]