This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 44aa829 ARROW-9288: [C++][Dataset] Fix PartitioningFactory with dictionary encoding for HivePartioning 44aa829 is described below commit 44aa8292605bf7484ae73b289055482e399e90d0 Author: Joris Van den Bossche <jorisvandenboss...@gmail.com> AuthorDate: Sun Jul 12 17:58:10 2020 -0500 ARROW-9288: [C++][Dataset] Fix PartitioningFactory with dictionary encoding for HivePartioning Closes #7608 from jorisvandenbossche/ARROW-9288 Authored-by: Joris Van den Bossche <jorisvandenboss...@gmail.com> Signed-off-by: Wes McKinney <w...@apache.org> --- cpp/src/arrow/dataset/partition.cc | 26 +++++++++++++++++++++++++- python/pyarrow/tests/test_dataset.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 744e9dd..2a2ecdf 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -317,6 +317,16 @@ class KeyValuePartitioningInspectImpl { return ::arrow::schema(std::move(fields)); } + std::vector<std::string> FieldNames() { + std::vector<std::string> names; + names.reserve(name_to_index_.size()); + + for (auto kv : name_to_index_) { + names.push_back(kv.first); + } + return names; + } + private: std::unordered_map<std::string, int> name_to_index_; std::vector<std::set<std::string>> values_; @@ -657,15 +667,29 @@ class HivePartitioningFactory : public PartitioningFactory { } } + field_names_ = impl.FieldNames(); return impl.Finish(&dictionaries_); } Result<std::shared_ptr<Partitioning>> Finish( const std::shared_ptr<Schema>& schema) const override { - return std::shared_ptr<Partitioning>(new HivePartitioning(schema, dictionaries_)); + if (dictionaries_.empty()) { + return std::make_shared<HivePartitioning>(schema, dictionaries_); + } else { + for (FieldRef ref : field_names_) { + // ensure all of field_names_ are present in schema + RETURN_NOT_OK(ref.FindOne(*schema).status()); + } + + // drop fields which aren't in field_names_ + auto out_schema = SchemaFromColumnNames(schema, field_names_); + + return std::make_shared<HivePartitioning>(std::move(out_schema), dictionaries_); + } } private: + std::vector<std::string> field_names_; ArrayVector dictionaries_; PartitioningFactoryOptions options_; }; diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 1c348f4..428547c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1484,6 +1484,35 @@ def test_open_dataset_non_existing_file(): ds.dataset('file:i-am-not-existing.parquet', format='parquet') +@pytest.mark.parquet +@pytest.mark.parametrize('partitioning', ["directory", "hive"]) +def test_open_dataset_partitioned_dictionary_type(tempdir, partitioning): + # ARROW-9288 + import pyarrow.parquet as pq + table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + + path = tempdir / "dataset" + path.mkdir() + + for part in ["A", "B", "C"]: + fmt = "{}" if partitioning == "directory" else "part={}" + part = path / fmt.format(part) + part.mkdir() + pq.write_table(table, part / "test.parquet") + + if partitioning == "directory": + part = ds.DirectoryPartitioning.discover( + ["part"], max_partition_dictionary_size=-1) + else: + part = ds.HivePartitioning.discover(max_partition_dictionary_size=-1) + + dataset = ds.dataset(str(path), partitioning=part) + expected_schema = table.schema.append( + pa.field("part", pa.dictionary(pa.int32(), pa.string())) + ) + assert dataset.schema.equals(expected_schema) + + @pytest.fixture def s3_example_simple(s3_connection, s3_server): from pyarrow.fs import FileSystem