jorisvandenbossche commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r501121145



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
             return False
 
 
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+    cdef:
+        CParquetFileWriteOptions* parquet_options
+        object _properties
+
+    def update(self, **kwargs):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        arrow_fields = {
+            "use_deprecated_int96_timestamps",
+            "coerce_timestamps",
+            "allow_truncated_timestamps",
+        }
+
+        update = False
+        update_arrow = False
+        for name, value in kwargs.items():
+            assert name in self._properties
+            self._properties[name] = value
+            if name in arrow_fields:
+                update_arrow = True
+            else:
+                update = True
+
+        if update:
+            opts.writer_properties = _create_writer_properties(
+                use_dictionary=self.use_dictionary,
+                compression=self.compression,
+                version=self.version,
+                write_statistics=self.write_statistics,
+                data_page_size=self.data_page_size,
+                compression_level=self.compression_level,
+                use_byte_stream_split=self.use_byte_stream_split,
+                data_page_version=self.data_page_version,
+            )
+
+        if update_arrow:
+            opts.arrow_writer_properties = _create_arrow_writer_properties(
+                use_deprecated_int96_timestamps=(
+                    self.use_deprecated_int96_timestamps
+                ),
+                coerce_timestamps=self.coerce_timestamps,
+                allow_truncated_timestamps=self.allow_truncated_timestamps,
+                writer_engine_version=self.writer_engine_version,
+            )
+
+    @property
+    def use_dictionary(self):
+        return self._properties['use_dictionary']
+
+    @use_dictionary.setter
+    def use_dictionary(self, use_dictionary):
+        self.update(use_dictionary=use_dictionary)

Review comment:
       Are those getters/setters needed for the implementation, or only added 
for user convenience?  
   But since we don't test it specifically, I would maybe rather leave it out 
for now (to limit the API surface that we need to maintain, can always add this 
if there is user demand for it)?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
             return False
 
 
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+    cdef:
+        CParquetFileWriteOptions* parquet_options
+        object _properties
+
+    def update(self, **kwargs):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        arrow_fields = {
+            "use_deprecated_int96_timestamps",
+            "coerce_timestamps",
+            "allow_truncated_timestamps",
+        }
+
+        update = False
+        update_arrow = False
+        for name, value in kwargs.items():
+            assert name in self._properties

Review comment:
       Instead of an assert, maybe raise a TypeError that such keyword is not 
supported?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
             return False
 
 
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+    cdef:
+        CParquetFileWriteOptions* parquet_options
+        object _properties
+
+    def update(self, **kwargs):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        arrow_fields = {
+            "use_deprecated_int96_timestamps",
+            "coerce_timestamps",
+            "allow_truncated_timestamps",
+        }
+
+        update = False
+        update_arrow = False
+        for name, value in kwargs.items():
+            assert name in self._properties

Review comment:
       ```suggestion
               if name not in self._properties:
                   raise TypeError("unexpected Parquet write option: 
{}".format(name))
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression 
partition_expression):
 
 
 def _filesystemdataset_write(
-    data, object base_dir, Schema schema not None,
-    FileFormat format not None, FileSystem filesystem not None,
-    Partitioning partitioning not None, bint use_threads=True,
+    data not None, object base_dir not None, str basename_template not None,
+    Schema schema not None, FileFormat format not None,
+    FileSystem filesystem not None, Partitioning partitioning not None,
+    FileWriteOptions file_options not None, bint use_threads,
 ):
     """
     CFileSystemDataset.Write wrapper
     """
     cdef:
-        c_string c_base_dir
-        shared_ptr[CSchema] c_schema
-        shared_ptr[CFileFormat] c_format
-        shared_ptr[CFileSystem] c_filesystem
-        shared_ptr[CPartitioning] c_partitioning
-        shared_ptr[CScanContext] c_context
-        # to create iterator of InMemory fragments
+        CFileSystemDatasetWriteOptions c_options
+        shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
-        shared_ptr[CFragment] c_fragment
-        vector[shared_ptr[CFragment]] c_fragment_vector
 
-    c_base_dir = tobytes(_stringify_path(base_dir))
-    c_schema = pyarrow_unwrap_schema(schema)
-    c_format = format.unwrap()
-    c_filesystem = filesystem.unwrap()
-    c_partitioning = partitioning.unwrap()
-    c_context = _build_scan_context(use_threads=use_threads)
+    c_options.file_write_options = file_options.unwrap()
+    c_options.filesystem = filesystem.unwrap()
+    c_options.base_dir = tobytes(_stringify_path(base_dir))
+    c_options.partitioning = partitioning.unwrap()
+    c_options.basename_template = tobytes(basename_template)

Review comment:
       The `FileFormat` is not part of the c_options? As that argument now 
seems to be ignored?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression 
partition_expression):
 
 
 def _filesystemdataset_write(
-    data, object base_dir, Schema schema not None,
-    FileFormat format not None, FileSystem filesystem not None,
-    Partitioning partitioning not None, bint use_threads=True,
+    data not None, object base_dir not None, str basename_template not None,
+    Schema schema not None, FileFormat format not None,
+    FileSystem filesystem not None, Partitioning partitioning not None,
+    FileWriteOptions file_options not None, bint use_threads,
 ):
     """
     CFileSystemDataset.Write wrapper
     """
     cdef:
-        c_string c_base_dir
-        shared_ptr[CSchema] c_schema
-        shared_ptr[CFileFormat] c_format
-        shared_ptr[CFileSystem] c_filesystem
-        shared_ptr[CPartitioning] c_partitioning
-        shared_ptr[CScanContext] c_context
-        # to create iterator of InMemory fragments
+        CFileSystemDatasetWriteOptions c_options
+        shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
-        shared_ptr[CFragment] c_fragment
-        vector[shared_ptr[CFragment]] c_fragment_vector
 
-    c_base_dir = tobytes(_stringify_path(base_dir))
-    c_schema = pyarrow_unwrap_schema(schema)
-    c_format = format.unwrap()
-    c_filesystem = filesystem.unwrap()
-    c_partitioning = partitioning.unwrap()
-    c_context = _build_scan_context(use_threads=use_threads)
+    c_options.file_write_options = file_options.unwrap()
+    c_options.filesystem = filesystem.unwrap()
+    c_options.base_dir = tobytes(_stringify_path(base_dir))
+    c_options.partitioning = partitioning.unwrap()
+    c_options.basename_template = tobytes(basename_template)

Review comment:
       OK, I suppose the format is embedded in the FileWriteOptions. But then 
FileFormat should not be passed here, I think? And we should also check that 
FileFormat and FileWriteOptions don't conflict if both are passed?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression 
partition_expression):
 
 
 def _filesystemdataset_write(
-    data, object base_dir, Schema schema not None,
-    FileFormat format not None, FileSystem filesystem not None,
-    Partitioning partitioning not None, bint use_threads=True,
+    data not None, object base_dir not None, str basename_template not None,
+    Schema schema not None, FileFormat format not None,
+    FileSystem filesystem not None, Partitioning partitioning not None,
+    FileWriteOptions file_options not None, bint use_threads,
 ):
     """
     CFileSystemDataset.Write wrapper
     """
     cdef:
-        c_string c_base_dir
-        shared_ptr[CSchema] c_schema
-        shared_ptr[CFileFormat] c_format
-        shared_ptr[CFileSystem] c_filesystem
-        shared_ptr[CPartitioning] c_partitioning
-        shared_ptr[CScanContext] c_context
-        # to create iterator of InMemory fragments
+        CFileSystemDatasetWriteOptions c_options
+        shared_ptr[CScanner] c_scanner
         vector[shared_ptr[CRecordBatch]] c_batches
-        shared_ptr[CFragment] c_fragment
-        vector[shared_ptr[CFragment]] c_fragment_vector
 
-    c_base_dir = tobytes(_stringify_path(base_dir))
-    c_schema = pyarrow_unwrap_schema(schema)
-    c_format = format.unwrap()
-    c_filesystem = filesystem.unwrap()
-    c_partitioning = partitioning.unwrap()
-    c_context = _build_scan_context(use_threads=use_threads)
+    c_options.file_write_options = file_options.unwrap()
+    c_options.filesystem = filesystem.unwrap()
+    c_options.base_dir = tobytes(_stringify_path(base_dir))
+    c_options.partitioning = partitioning.unwrap()
+    c_options.basename_template = tobytes(basename_template)
 
     if isinstance(data, Dataset):
-        with nogil:
-            check_status(
-                CFileSystemDataset.Write(
-                    c_schema,
-                    c_format,
-                    c_filesystem,
-                    c_base_dir,
-                    c_partitioning,
-                    c_context,
-                    (<Dataset> data).dataset.GetFragments()
-                )
-            )
+        scanner = data._scanner(use_threads=use_threads)
     else:
-        # data is list of batches/tables, one element per fragment
+        # data is list of batches/tables
         for table in data:
             if isinstance(table, Table):
                 for batch in table.to_batches():
                     c_batches.push_back((<RecordBatch> batch).sp_batch)
             else:
                 c_batches.push_back((<RecordBatch> table).sp_batch)
 
-            c_fragment = shared_ptr[CFragment](
-                new CInMemoryFragment(c_batches, _true.unwrap()))
-            c_batches.clear()
-            c_fragment_vector.push_back(c_fragment)
+        data = Fragment.wrap(shared_ptr[CFragment](
+            new CInMemoryFragment(move(c_batches), _true.unwrap())))

Review comment:
       Does this change behaviour? It seems you are now creating a single 
fragment instead of a vector of fragments?

##########
File path: python/pyarrow/dataset.py
##########
@@ -741,6 +746,12 @@ def write_dataset(data, base_dir, format=None, 
partitioning=None, schema=None,
         )
 
     format = _ensure_format(format)
+    if basename_template is None:
+        basename_template = "dat_{i}." + format.default_extname

Review comment:
       Maybe we could go with `"part-{i}."` for the default name (that seems 
more in line with eg dask / spark) 
   (I can also do this in a follow-up PR, if we want this)

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
             return False
 
 
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+    cdef:
+        CParquetFileWriteOptions* parquet_options
+        object _properties
+
+    def update(self, **kwargs):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        arrow_fields = {
+            "use_deprecated_int96_timestamps",
+            "coerce_timestamps",
+            "allow_truncated_timestamps",
+        }
+
+        update = False
+        update_arrow = False
+        for name, value in kwargs.items():
+            assert name in self._properties
+            self._properties[name] = value
+            if name in arrow_fields:
+                update_arrow = True
+            else:
+                update = True
+
+        if update:
+            opts.writer_properties = _create_writer_properties(
+                use_dictionary=self.use_dictionary,
+                compression=self.compression,
+                version=self.version,
+                write_statistics=self.write_statistics,
+                data_page_size=self.data_page_size,
+                compression_level=self.compression_level,
+                use_byte_stream_split=self.use_byte_stream_split,
+                data_page_version=self.data_page_version,
+            )
+
+        if update_arrow:
+            opts.arrow_writer_properties = _create_arrow_writer_properties(
+                use_deprecated_int96_timestamps=(
+                    self.use_deprecated_int96_timestamps
+                ),
+                coerce_timestamps=self.coerce_timestamps,
+                allow_truncated_timestamps=self.allow_truncated_timestamps,
+                writer_engine_version=self.writer_engine_version,
+            )
+
+    @property
+    def use_dictionary(self):
+        return self._properties['use_dictionary']
+
+    @use_dictionary.setter
+    def use_dictionary(self, use_dictionary):
+        self.update(use_dictionary=use_dictionary)

Review comment:
       Ah, I see now they are actually used in the `update` to get the values 
out of the `_properties`. But that's only the property, not the setter, that is 
used? (and could in principle also use `_properties` directly there)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to