starpact commented on code in PR #799:
URL: https://github.com/apache/iceberg-go/pull/799#discussion_r2982615325


##########
table/arrow_utils.go:
##########
@@ -1343,72 +1344,84 @@ func computeStatsPlan(sc *iceberg.Schema, props 
iceberg.Properties) (map[int]tbl
        return result, nil
 }
 
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
-       return func(yield func(iceberg.DataFile, error) bool) {
-               defer func() {
-                       if r := recover(); r != nil {
-                               switch e := r.(type) {
-                               case string:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %s", e))
-                               case error:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %w", e))
-                               }
-                       }
-               }()
-
-               partitionSpec, err := meta.CurrentSpec()
-               if err != nil || partitionSpec == nil {
-                       yield(nil, fmt.Errorf("%w: cannot add files without a 
current spec", err))
-
-                       return
-               }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, 
err error) {
+       partitionSpec, err := meta.CurrentSpec()
+       if err != nil || partitionSpec == nil {
+               return nil, fmt.Errorf("%w: cannot add files without a current 
spec", err)
+       }
 
-               currentSchema, currentSpec := meta.CurrentSchema(), 
*partitionSpec
+       currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
 
-               for filePath := range paths {
-                       format := tblutils.FormatFromFileName(filePath)
-                       rdr := must(format.Open(ctx, fileIO, filePath))
-                       // TODO: take a look at this defer Close() and consider 
refactoring
-                       defer rdr.Close()
+       dataFiles := make([]iceberg.DataFile, len(filePaths))
+       eg, ctx := errgroup.WithContext(ctx)
+       eg.SetLimit(concurrency)
+       for i, filePath := range filePaths {
+               eg.Go(func() (err error) {
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       switch e := r.(type) {
+                                       case string:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %s", e)
+                                       case error:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %w", e)
+                                       }
+                               }
+                       }()
 
-                       arrSchema := must(rdr.Schema())
+                       dataFile, err := fileToDataFile(ctx, fileIO, filePath, 
currentSchema, currentSpec, meta.props)
+                       if err != nil {
+                               return err
+                       }
+                       dataFiles[i] = dataFile
 
-                       if err := checkArrowSchemaCompat(currentSchema, 
arrSchema, false); err != nil {
-                               yield(nil, err)
+                       return nil
+               })
+       }
 
-                               return
-                       }
+       if err := eg.Wait(); err != nil {
+               return nil, err
+       }
 
-                       pathToIDSchema := currentSchema
-                       if fileHasIDs := must(VisitArrowSchema(arrSchema, 
hasIDs{})); fileHasIDs {
-                               pathToIDSchema = 
must(ArrowSchemaToIceberg(arrSchema, false, nil))
-                       }
+       return dataFiles, nil
+}
 
-                       statistics := 
format.DataFileStatsFromMeta(rdr.Metadata(), 
must(computeStatsPlan(currentSchema, meta.props)),
-                               must(format.PathToIDMapping(pathToIDSchema)))
+func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string, 
currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props 
iceberg.Properties) (iceberg.DataFile, error) {
+       format := tblutils.FormatFromFileName(filePath)
+       rdr := must(format.Open(ctx, fileIO, filePath))

Review Comment:
   Updated.
   I want to use normal `error` handling but utils like `statistics.ToDataFile` 
use `must` internally, so I think I'll follow that for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to