bkietz commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r501159050



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression 
partition_expression):
 
 
 def _filesystemdataset_write(
-    data, object base_dir, Schema schema not None,
-    FileFormat format not None, FileSystem filesystem not None,
-    Partitioning partitioning not None, bint use_threads=True,
+    data not None, object base_dir not None, str basename_template not None,
+    Schema schema not None, FileFormat format not None,
+    FileSystem filesystem not None, Partitioning partitioning not None,
+    FileWriteOptions file_options not None, bint use_threads,
 ):
     """
     CFileSystemDataset.Write wrapper
     """
     cdef:
-        c_string c_base_dir
-        shared_ptr[CSchema] c_schema
-        shared_ptr[CFileFormat] c_format
-        shared_ptr[CFileSystem] c_filesystem
-        shared_ptr[CPartitioning] c_partitioning
-        shared_ptr[CScanContext] c_context
-        # to create iterator of InMemory fragments
+        CFileSystemDatasetWriteOptions c_options
+        shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
-        shared_ptr[CFragment] c_fragment
-        vector[shared_ptr[CFragment]] c_fragment_vector
 
-    c_base_dir = tobytes(_stringify_path(base_dir))
-    c_schema = pyarrow_unwrap_schema(schema)
-    c_format = format.unwrap()
-    c_filesystem = filesystem.unwrap()
-    c_partitioning = partitioning.unwrap()
-    c_context = _build_scan_context(use_threads=use_threads)
+    c_options.file_write_options = file_options.unwrap()
+    c_options.filesystem = filesystem.unwrap()
+    c_options.base_dir = tobytes(_stringify_path(base_dir))
+    c_options.partitioning = partitioning.unwrap()
+    c_options.basename_template = tobytes(basename_template)
 
     if isinstance(data, Dataset):
-        with nogil:
-            check_status(
-                CFileSystemDataset.Write(
-                    c_schema,
-                    c_format,
-                    c_filesystem,
-                    c_base_dir,
-                    c_partitioning,
-                    c_context,
-                    (<Dataset> data).dataset.GetFragments()
-                )
-            )
+        scanner = data._scanner(use_threads=use_threads)
     else:
-        # data is list of batches/tables, one element per fragment
+        # data is list of batches/tables
         for table in data:
             if isinstance(table, Table):
                 for batch in table.to_batches():
                     c_batches.push_back((<RecordBatch> batch).sp_batch)
             else:
                 c_batches.push_back((<RecordBatch> table).sp_batch)
 
-            c_fragment = shared_ptr[CFragment](
-                new CInMemoryFragment(c_batches, _true.unwrap()))
-            c_batches.clear()
-            c_fragment_vector.push_back(c_fragment)
+        data = Fragment.wrap(shared_ptr[CFragment](
+            new CInMemoryFragment(move(c_batches), _true.unwrap())))

Review comment:
       FileSystemDataset::Write now parallelizes across scan tasks rather than 
fragments so there will be no difference in performance/written files even if 
we create a single in-memory fragment, so I changed this to create a single 
fragment since it's simpler




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to