This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new dd3670583d GH-34884: [Python]: Support pickling pyarrow.dataset
Partitioning subclasses (#36462)
dd3670583d is described below
commit dd3670583d6fd8b95783f89f7b80c04588e16fb1
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Fri Jul 7 14:47:39 2023 +0200
GH-34884: [Python]: Support pickling pyarrow.dataset Partitioning
subclasses (#36462)
### Rationale for this change
Add support for pickling Directory/Hive/FilenamePartitioning objects.
Does not yet actually fix the issue #34884, because this PR only addresses
the actual Partitioning subclasses, and not the PartitioningFactory subclasses.
### Are these changes tested?
Yes
### Are there any user-facing changes?
Only new support for pickling and `==` operation.
* Issue: #34884
Lead-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
cpp/src/arrow/dataset/partition.h | 2 +
python/pyarrow/_dataset.pyx | 40 ++++++++++++++-
python/pyarrow/includes/libarrow_dataset.pxd | 9 ++--
python/pyarrow/tests/test_dataset.py | 77 ++++++++++++++++++----------
4 files changed, 97 insertions(+), 31 deletions(-)
diff --git a/cpp/src/arrow/dataset/partition.h
b/cpp/src/arrow/dataset/partition.h
index b122047c07..315a3d384d 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -187,6 +187,8 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public
Partitioning {
const ArrayVector& dictionaries() const { return dictionaries_; }
+ SegmentEncoding segment_encoding() const { return options_.segment_encoding;
}
+
bool Equals(const Partitioning& other) const override;
protected:
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index d2b5554ec1..2ab8ffb798 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -131,12 +131,20 @@ cdef CFileSource _make_file_source(object file,
FileSystem filesystem=None):
cdef CSegmentEncoding _get_segment_encoding(str segment_encoding):
if segment_encoding == "none":
- return CSegmentEncodingNone
+ return CSegmentEncoding_None
elif segment_encoding == "uri":
- return CSegmentEncodingUri
+ return CSegmentEncoding_Uri
raise ValueError(f"Unknown segment encoding: {segment_encoding}")
+cdef str _wrap_segment_encoding(CSegmentEncoding segment_encoding):
+ if segment_encoding == CSegmentEncoding_None:
+ return "none"
+ elif segment_encoding == CSegmentEncoding_Uri:
+ return "uri"
+ raise ValueError("Unknown segment encoding")
+
+
cdef Expression _true = Expression._scalar(True)
@@ -2339,6 +2347,12 @@ cdef class Partitioning(_Weakrefable):
cdef inline shared_ptr[CPartitioning] unwrap(self):
return self.wrapped
+ def __eq__(self, other):
+ try:
+ return
self.partitioning.Equals(deref((<Partitioning>other).unwrap()))
+ except TypeError:
+ return False
+
def parse(self, path):
cdef CResult[CExpression] result
result = self.partitioning.Parse(tobytes(path))
@@ -2393,6 +2407,7 @@ cdef vector[shared_ptr[CArray]]
_partitioning_dictionaries(
return c_dictionaries
+
cdef class KeyValuePartitioning(Partitioning):
cdef:
@@ -2407,6 +2422,15 @@ cdef class KeyValuePartitioning(Partitioning):
self.wrapped = sp
self.partitioning = sp.get()
+ def __reduce__(self):
+ dictionaries = self.dictionaries
+ if dictionaries:
+ dictionaries = dict(zip(self.schema.names, dictionaries))
+ segment_encoding = _wrap_segment_encoding(
+ deref(self.keyvalue_partitioning).segment_encoding()
+ )
+ return self.__class__, (self.schema, dictionaries, segment_encoding)
+
@property
def dictionaries(self):
"""
@@ -2620,6 +2644,18 @@ cdef class HivePartitioning(KeyValuePartitioning):
KeyValuePartitioning.init(self, sp)
self.hive_partitioning = <CHivePartitioning*> sp.get()
+ def __reduce__(self):
+ dictionaries = self.dictionaries
+ if dictionaries:
+ dictionaries = dict(zip(self.schema.names, dictionaries))
+ segment_encoding = _wrap_segment_encoding(
+ deref(self.keyvalue_partitioning).segment_encoding()
+ )
+ null_fallback =
frombytes(deref(self.hive_partitioning).null_fallback())
+ return HivePartitioning, (
+ self.schema, dictionaries, null_fallback, segment_encoding
+ )
+
@staticmethod
def discover(infer_dictionary=False,
max_partition_dictionary_size=0,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index 201fb78217..8901d763e3 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -288,13 +288,14 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
c_string type_name() const
CResult[CExpression] Parse(const c_string & path) const
const shared_ptr[CSchema] & schema()
+ c_bool Equals(const CPartitioning& other) const
cdef cppclass CSegmentEncoding" arrow::dataset::SegmentEncoding":
- pass
+ bint operator==(CSegmentEncoding)
- CSegmentEncoding CSegmentEncodingNone\
+ CSegmentEncoding CSegmentEncoding_None\
" arrow::dataset::SegmentEncoding::None"
- CSegmentEncoding CSegmentEncodingUri\
+ CSegmentEncoding CSegmentEncoding_Uri\
" arrow::dataset::SegmentEncoding::Uri"
cdef cppclass CKeyValuePartitioningOptions \
@@ -329,6 +330,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CKeyValuePartitioningOptions options)
vector[shared_ptr[CArray]] dictionaries() const
+ CSegmentEncoding segment_encoding()
cdef cppclass CDirectoryPartitioning \
"arrow::dataset::DirectoryPartitioning"(CPartitioning):
@@ -352,6 +354,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CHivePartitioningFactoryOptions)
vector[shared_ptr[CArray]] dictionaries() const
+ c_string null_fallback() const
cdef cppclass CFilenamePartitioning \
"arrow::dataset::FilenamePartitioning"(CPartitioning):
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 144da21cf5..d939af662f 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -588,13 +588,13 @@ def test_partitioning():
ds.FilenamePartitioning]:
partitioning = klass(schema)
assert isinstance(partitioning, ds.Partitioning)
+ assert partitioning == klass(schema)
- partitioning = ds.DirectoryPartitioning(
- pa.schema([
- pa.field('group', pa.int64()),
- pa.field('key', pa.float64())
- ])
- )
+ schema = pa.schema([
+ pa.field('group', pa.int64()),
+ pa.field('key', pa.float64())
+ ])
+ partitioning = ds.DirectoryPartitioning(schema)
assert len(partitioning.dictionaries) == 2
assert all(x is None for x in partitioning.dictionaries)
expr = partitioning.parse('/3/3.14/')
@@ -610,13 +610,13 @@ def test_partitioning():
expected = ds.field('group') == 3
assert expr.equals(expected)
- partitioning = ds.HivePartitioning(
- pa.schema([
- pa.field('alpha', pa.int64()),
- pa.field('beta', pa.int64())
- ]),
- null_fallback='xyz'
- )
+ assert partitioning != ds.DirectoryPartitioning(schema,
segment_encoding="none")
+
+ schema = pa.schema([
+ pa.field('alpha', pa.int64()),
+ pa.field('beta', pa.int64())
+ ])
+ partitioning = ds.HivePartitioning(schema, null_fallback='xyz')
assert len(partitioning.dictionaries) == 2
assert all(x is None for x in partitioning.dictionaries)
expr = partitioning.parse('/alpha=0/beta=3/')
@@ -636,12 +636,13 @@ def test_partitioning():
with pytest.raises(pa.ArrowInvalid):
partitioning.parse(shouldfail)
- partitioning = ds.FilenamePartitioning(
- pa.schema([
- pa.field('group', pa.int64()),
- pa.field('key', pa.float64())
- ])
- )
+ assert partitioning != ds.HivePartitioning(schema, null_fallback='other')
+
+ schema = pa.schema([
+ pa.field('group', pa.int64()),
+ pa.field('key', pa.float64())
+ ])
+ partitioning = ds.FilenamePartitioning(schema)
assert len(partitioning.dictionaries) == 2
assert all(x is None for x in partitioning.dictionaries)
expr = partitioning.parse('3_3.14_')
@@ -653,17 +654,19 @@ def test_partitioning():
with pytest.raises(pa.ArrowInvalid):
partitioning.parse('prefix_3_aaa_')
+ assert partitioning != ds.FilenamePartitioning(schema,
segment_encoding="none")
+
+ schema = pa.schema([
+ pa.field('group', pa.int64()),
+ pa.field('key', pa.dictionary(pa.int8(), pa.string()))
+ ])
partitioning = ds.DirectoryPartitioning(
- pa.schema([
- pa.field('group', pa.int64()),
- pa.field('key', pa.dictionary(pa.int8(), pa.string()))
- ]),
- dictionaries={
- "key": pa.array(["first", "second", "third"]),
- })
+ schema, dictionaries={"key": pa.array(["first", "second", "third"])}
+ )
assert partitioning.dictionaries[0] is None
assert partitioning.dictionaries[1].to_pylist() == [
"first", "second", "third"]
+ assert partitioning != ds.DirectoryPartitioning(schema, dictionaries=None)
partitioning = ds.FilenamePartitioning(
pa.schema([
@@ -696,6 +699,24 @@ def test_partitioning():
assert load_back_table.equals(table)
+def test_partitioning_pickling():
+ schema = pa.schema([
+ pa.field('i64', pa.int64()),
+ pa.field('f64', pa.float64())
+ ])
+ parts = [
+ ds.DirectoryPartitioning(schema),
+ ds.HivePartitioning(schema),
+ ds.FilenamePartitioning(schema),
+ ds.DirectoryPartitioning(schema, segment_encoding="none"),
+ ds.FilenamePartitioning(schema, segment_encoding="none"),
+ ds.HivePartitioning(schema, segment_encoding="none",
null_fallback="xyz"),
+ ]
+
+ for part in parts:
+ assert pickle.loads(pickle.dumps(part)) == part
+
+
def test_expression_arithmetic_operators():
dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]}))
a = ds.field("a")
@@ -3740,6 +3761,10 @@ def test_dataset_preserved_partitioning(tempdir):
_, path = _create_single_file(tempdir)
dataset = ds.dataset(path)
assert isinstance(dataset.partitioning, ds.DirectoryPartitioning)
+ # TODO(GH-34884) partitioning attribute not preserved in pickling
+ # dataset_ = ds.dataset(path)
+ # for dataset in [dataset_, pickle.loads(pickle.dumps(dataset_))]:
+ # assert isinstance(dataset.partitioning, ds.DirectoryPartitioning)
# through discovery, with hive partitioning but not specified
full_table, path = _create_partitioned_dataset(tempdir)