Repository: arrow Updated Branches: refs/heads/master e8bc1fe3b -> 121e82682
ARROW-361: Python: Support reading a column-selection from Parquet files Author: Uwe L. Korn <uw...@xhochy.com> Closes #197 from xhochy/ARROW-361 and squashes the following commits: c1fb939 [Uwe L. Korn] Cache column indices 0c32213 [Uwe L. Korn] ARROW-361: Python: Support reading a column-selection from Parquet files Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/121e8268 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/121e8268 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/121e8268 Branch: refs/heads/master Commit: 121e82682344b04bdb26edf16344a9fb2cee240c Parents: e8bc1fe Author: Uwe L. Korn <uw...@xhochy.com> Authored: Sun Nov 6 16:08:44 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sun Nov 6 16:08:44 2016 -0500 ---------------------------------------------------------------------- python/pyarrow/includes/parquet.pxd | 25 ++++++++++++--- python/pyarrow/parquet.pyx | 53 ++++++++++++++++++++++++++++++- python/pyarrow/tests/test_parquet.py | 16 ++++++++++ 3 files changed, 89 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/includes/parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 754eecc..57c35ba 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool +from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool from pyarrow.includes.libarrow_io cimport ReadableFileInterface @@ -32,6 +32,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: cdef cppclass PrimitiveNode(Node): pass + cdef cppclass ColumnPath: + c_string ToDotString() + cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: enum ParquetVersion" parquet::ParquetVersion::type": PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0" @@ -44,13 +47,14 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: LZO" parquet::Compression::LZO" BROTLI" parquet::Compression::BROTLI" + cdef cppclass ColumnDescriptor: + shared_ptr[ColumnPath] path() + cdef cppclass SchemaDescriptor: + const ColumnDescriptor* Column(int i) shared_ptr[Node] schema() GroupNode* group() - cdef cppclass ColumnDescriptor: - pass - cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass ColumnReader: @@ -80,10 +84,21 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass RowGroupReader: pass + cdef cppclass FileMetaData: + uint32_t size() + int num_columns() + int64_t num_rows() + int num_row_groups() + int32_t version() + const c_string created_by() + int num_schema_elements() + const SchemaDescriptor* schema() + cdef cppclass ParquetFileReader: # TODO: Some default arguments are missing @staticmethod unique_ptr[ParquetFileReader] OpenFile(const c_string& path) + const FileMetaData* metadata(); cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: @@ -124,7 +139,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef cppclass FileReader: FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) + CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out); CStatus ReadFlatTable(shared_ptr[CTable]* out); + const ParquetFileReader* parquet_reader(); cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index a56c1e1..2152f89 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -24,6 +24,7 @@ from pyarrow.includes.parquet cimport * from pyarrow.includes.libarrow_io cimport ReadableFileInterface cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow.array cimport Array from pyarrow.compat import tobytes from pyarrow.error import ArrowException from pyarrow.error cimport check_status @@ -43,6 +44,7 @@ cdef class ParquetReader: cdef: ParquetAllocator allocator unique_ptr[FileReader] reader + column_idx_map def __cinit__(self): self.allocator.set_pool(default_memory_pool()) @@ -76,11 +78,55 @@ cdef class ParquetReader: table.init(ctable) return table + def column_name_idx(self, column_name): + """ + Find the matching index of a column in the schema. + + Parameter + --------- + column_name: str + Name of the column, separation of nesting levels is done via ".". + + Returns + ------- + column_idx: int + Integer index of the position of the column + """ + cdef: + const FileMetaData* metadata = self.reader.get().parquet_reader().metadata() + int i = 0 + + if self.column_idx_map is None: + self.column_idx_map = {} + for i in range(0, metadata.num_columns()): + self.column_idx_map[str(metadata.schema().Column(i).path().get().ToDotString())] = i + + return self.column_idx_map[column_name] + + def read_column(self, int column_index): + cdef: + Array array = Array() + shared_ptr[CArray] carray + + with nogil: + check_status(self.reader.get().ReadFlatColumn(column_index, &carray)) + + array.init(carray) + return array + def read_table(source, columns=None): """ Read a Table from Parquet format + Parameters + ---------- + source: str or pyarrow.io.NativeFile + Readable source. For passing Python file objects or byte buffers, see + pyarrow.io.PythonFileInterface or pyarrow.io.BytesReader. + columns: list + If not None, only these columns will be read from the file. + Returns ------- pyarrow.table.Table @@ -93,7 +139,12 @@ def read_table(source, columns=None): elif isinstance(source, NativeFile): reader.open_native_file(source) - return reader.read_all() + if columns is None: + return reader.read_all() + else: + column_idxs = [reader.column_name_idx(column) for column in columns] + arrays = [reader.read_column(column_idx) for column_idx in column_idxs] + return Table.from_arrays(columns, arrays) def write_table(table, filename, chunk_size=None, version=None, http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 922ad3a..c1d44ce 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -116,6 +116,22 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): pdt.assert_frame_equal(df, df_read) @parquet +def test_pandas_column_selection(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16) + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath) + table_read = pq.read_table(filename.strpath, columns=['uint8']) + df_read = table_read.to_pandas() + + pdt.assert_frame_equal(df[['uint8']], df_read) + +@parquet def test_pandas_parquet_configuration_options(tmpdir): size = 10000 np.random.seed(0)