Repository: arrow
Updated Branches:
  refs/heads/master e8bc1fe3b -> 121e82682


ARROW-361: Python: Support reading a column-selection from Parquet files

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #197 from xhochy/ARROW-361 and squashes the following commits:

c1fb939 [Uwe L. Korn] Cache column indices
0c32213 [Uwe L. Korn] ARROW-361: Python: Support reading a column-selection 
from Parquet files


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/121e8268
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/121e8268
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/121e8268

Branch: refs/heads/master
Commit: 121e82682344b04bdb26edf16344a9fb2cee240c
Parents: e8bc1fe
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Nov 6 16:08:44 2016 -0500
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Sun Nov 6 16:08:44 2016 -0500

----------------------------------------------------------------------
 python/pyarrow/includes/parquet.pxd  | 25 ++++++++++++---
 python/pyarrow/parquet.pyx           | 53 ++++++++++++++++++++++++++++++-
 python/pyarrow/tests/test_parquet.py | 16 ++++++++++
 3 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd 
b/python/pyarrow/includes/parquet.pxd
index 754eecc..57c35ba 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -18,7 +18,7 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
+from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, 
MemoryPool
 from pyarrow.includes.libarrow_io cimport ReadableFileInterface
 
 
@@ -32,6 +32,9 @@ cdef extern from "parquet/api/schema.h" namespace 
"parquet::schema" nogil:
   cdef cppclass PrimitiveNode(Node):
     pass
 
+  cdef cppclass ColumnPath:
+    c_string ToDotString()
+
 cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
   enum ParquetVersion" parquet::ParquetVersion::type":
       PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
@@ -44,13 +47,14 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" 
nogil:
       LZO" parquet::Compression::LZO"
       BROTLI" parquet::Compression::BROTLI"
 
+  cdef cppclass ColumnDescriptor:
+    shared_ptr[ColumnPath] path()
+
   cdef cppclass SchemaDescriptor:
+    const ColumnDescriptor* Column(int i)
     shared_ptr[Node] schema()
     GroupNode* group()
 
-  cdef cppclass ColumnDescriptor:
-    pass
-
 
 cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
     cdef cppclass ColumnReader:
@@ -80,10 +84,21 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" 
nogil:
     cdef cppclass RowGroupReader:
         pass
 
+    cdef cppclass FileMetaData:
+        uint32_t size()
+        int num_columns()
+        int64_t num_rows()
+        int num_row_groups()
+        int32_t version()
+        const c_string created_by()
+        int num_schema_elements()
+        const SchemaDescriptor* schema()
+
     cdef cppclass ParquetFileReader:
         # TODO: Some default arguments are missing
         @staticmethod
         unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
+        const FileMetaData* metadata();
 
 
 cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
@@ -124,7 +139,9 @@ cdef extern from "parquet/arrow/reader.h" namespace 
"parquet::arrow" nogil:
 
     cdef cppclass FileReader:
         FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
+        CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out);
         CStatus ReadFlatTable(shared_ptr[CTable]* out);
+        const ParquetFileReader* parquet_reader();
 
 
 cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:

http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index a56c1e1..2152f89 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -24,6 +24,7 @@ from pyarrow.includes.parquet cimport *
 from pyarrow.includes.libarrow_io cimport ReadableFileInterface
 cimport pyarrow.includes.pyarrow as pyarrow
 
+from pyarrow.array cimport Array
 from pyarrow.compat import tobytes
 from pyarrow.error import ArrowException
 from pyarrow.error cimport check_status
@@ -43,6 +44,7 @@ cdef class ParquetReader:
     cdef:
         ParquetAllocator allocator
         unique_ptr[FileReader] reader
+        column_idx_map
 
     def __cinit__(self):
         self.allocator.set_pool(default_memory_pool())
@@ -76,11 +78,55 @@ cdef class ParquetReader:
         table.init(ctable)
         return table
 
+    def column_name_idx(self, column_name):
+        """
+        Find the matching index of a column in the schema.
+
+        Parameter
+        ---------
+        column_name: str
+            Name of the column, separation of nesting levels is done via ".".
+
+        Returns
+        -------
+        column_idx: int
+            Integer index of the position of the column
+        """
+        cdef:
+            const FileMetaData* metadata = 
self.reader.get().parquet_reader().metadata()
+            int i = 0
+
+        if self.column_idx_map is None:
+            self.column_idx_map = {}
+            for i in range(0, metadata.num_columns()):
+                
self.column_idx_map[str(metadata.schema().Column(i).path().get().ToDotString())]
 = i
+
+        return self.column_idx_map[column_name]
+
+    def read_column(self, int column_index):
+        cdef:
+            Array array = Array()
+            shared_ptr[CArray] carray
+
+        with nogil:
+            check_status(self.reader.get().ReadFlatColumn(column_index, 
&carray))
+
+        array.init(carray)
+        return array
+
 
 def read_table(source, columns=None):
     """
     Read a Table from Parquet format
 
+    Parameters
+    ----------
+    source: str or pyarrow.io.NativeFile
+        Readable source. For passing Python file objects or byte buffers, see
+        pyarrow.io.PythonFileInterface or pyarrow.io.BytesReader.
+    columns: list
+        If not None, only these columns will be read from the file.
+
     Returns
     -------
     pyarrow.table.Table
@@ -93,7 +139,12 @@ def read_table(source, columns=None):
     elif isinstance(source, NativeFile):
         reader.open_native_file(source)
 
-    return reader.read_all()
+    if columns is None:
+        return reader.read_all()
+    else:
+        column_idxs = [reader.column_name_idx(column) for column in columns]
+        arrays = [reader.read_column(column_idx) for column_idx in column_idxs]
+        return Table.from_arrays(columns, arrays)
 
 
 def write_table(table, filename, chunk_size=None, version=None,

http://git-wip-us.apache.org/repos/asf/arrow/blob/121e8268/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 922ad3a..c1d44ce 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -116,6 +116,22 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
     pdt.assert_frame_equal(df, df_read)
 
 @parquet
+def test_pandas_column_selection(tmpdir):
+    size = 10000
+    np.random.seed(0)
+    df = pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16)
+    })
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = A.from_pandas_dataframe(df)
+    A.parquet.write_table(arrow_table, filename.strpath)
+    table_read = pq.read_table(filename.strpath, columns=['uint8'])
+    df_read = table_read.to_pandas()
+
+    pdt.assert_frame_equal(df[['uint8']], df_read)
+
+@parquet
 def test_pandas_parquet_configuration_options(tmpdir):
     size = 10000
     np.random.seed(0)

Reply via email to