This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d7fbb2  ARROW-3380: [Python] Support reading gzipped CSV files
2d7fbb2 is described below

commit 2d7fbb255db973280c0abce2693441df31b1c9e1
Author: Antoine Pitrou <[email protected]>
AuthorDate: Sat Oct 20 10:21:18 2018 -0400

    ARROW-3380: [Python] Support reading gzipped CSV files
    
    Also works for other bundled compression types.
    
    Based on PR #2777.
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #2786 from pitrou/ARROW-3380-gzipped-csv-read and squashes the 
following commits:
    
    9a2244f84 <Antoine Pitrou> ARROW-3380:  Support reading gzipped CSV files
---
 cpp/src/arrow/io/api.h               |  1 +
 python/pyarrow/_csv.pyx              |  6 ++---
 python/pyarrow/includes/libarrow.pxd | 10 ++++++++
 python/pyarrow/io.pxi                | 44 ++++++++++++++++++++++++++++++++++++
 python/pyarrow/ipc.pxi               |  6 ++---
 python/pyarrow/lib.pxd               |  2 ++
 python/pyarrow/tests/test_csv.py     | 31 +++++++++++++++++++++++++
 7 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/cpp/src/arrow/io/api.h b/cpp/src/arrow/io/api.h
index 9f26d95..0d5742a 100644
--- a/cpp/src/arrow/io/api.h
+++ b/cpp/src/arrow/io/api.h
@@ -18,6 +18,7 @@
 #ifndef ARROW_IO_API_H
 #define ARROW_IO_API_H
 
+#include "arrow/io/compressed.h"
 #include "arrow/io/file.h"
 #include "arrow/io/hdfs.h"
 #include "arrow/io/interfaces.h"
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index cc69862..8005f44 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -22,7 +22,7 @@
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 from pyarrow.lib cimport (check_status, MemoryPool, maybe_unbox_memory_pool,
-                          pyarrow_wrap_table, get_reader)
+                          pyarrow_wrap_table, get_input_stream)
 
 
 cdef unsigned char _single_char(s) except 0:
@@ -149,10 +149,8 @@ cdef class ParseOptions:
 
 
 cdef _get_reader(input_file, shared_ptr[InputStream]* out):
-    cdef shared_ptr[RandomAccessFile] result
     use_memory_map = False
-    get_reader(input_file, use_memory_map, &result)
-    out[0] = <shared_ptr[InputStream]> result
+    get_input_stream(input_file, use_memory_map, out)
 
 
 cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 2a69446..c53b4ad 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -663,6 +663,16 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
 
         int file_descriptor()
 
+    cdef cppclass CompressedInputStream(InputStream):
+        @staticmethod
+        CStatus Make(CMemoryPool* pool, CCodec* codec,
+                     shared_ptr[InputStream] raw,
+                     shared_ptr[CompressedInputStream]* out)
+
+        @staticmethod
+        CStatus Make(CCodec* codec, shared_ptr[InputStream] raw,
+                     shared_ptr[CompressedInputStream]* out)
+
     # ----------------------------------------------------------------------
     # HDFS
 
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 2e934da..ca14346 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1055,6 +1055,39 @@ cdef get_reader(object source, c_bool use_memory_map,
                         .format(type(source)))
 
 
+cdef get_input_stream(object source, c_bool use_memory_map,
+                      shared_ptr[InputStream]* out):
+    """
+    Like get_reader(), but can automatically decompress, and returns
+    an InputStream.
+    """
+    cdef:
+        NativeFile nf
+        shared_ptr[RandomAccessFile] random_access_file
+        shared_ptr[InputStream] input_stream
+        shared_ptr[CompressedInputStream] compressed_stream
+        CompressionType compression_type = CompressionType_UNCOMPRESSED
+        unique_ptr[CCodec] codec
+
+    try:
+        source_path = _stringify_path(source)
+    except TypeError:
+        pass
+    else:
+        compression_type = _get_compression_type_by_filename(source_path)
+
+    get_reader(source, use_memory_map, &random_access_file)
+    input_stream = <shared_ptr[InputStream]> random_access_file
+
+    if compression_type != CompressionType_UNCOMPRESSED:
+        check_status(CCodec.Create(compression_type, &codec))
+        check_status(CompressedInputStream.Make(codec.get(), input_stream,
+                                                &compressed_stream))
+        input_stream = <shared_ptr[InputStream]> compressed_stream
+
+    out[0] = input_stream
+
+
 cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     cdef NativeFile nf
 
@@ -1099,6 +1132,17 @@ cdef CompressionType _get_compression_type(object name):
                          .format(str(name)))
 
 
+cdef CompressionType _get_compression_type_by_filename(str filename):
+    if filename.endswith('.gz'):
+        return CompressionType_GZIP
+    elif filename.endswith('.lz4'):
+        return CompressionType_LZ4
+    elif filename.endswith('.zst'):
+        return CompressionType_ZSTD
+    else:
+        return CompressionType_UNCOMPRESSED
+
+
 def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
     """
     Compress pyarrow.Buffer or Python object supporting the buffer (memoryview)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 0331fba..a65b7f7 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -118,7 +118,7 @@ cdef class MessageReader:
         cdef MessageReader result = MessageReader.__new__(MessageReader)
         cdef shared_ptr[InputStream] in_stream
         cdef unique_ptr[CMessageReader] reader
-        get_input_stream(source, &in_stream)
+        _get_input_stream(source, &in_stream)
         with nogil:
             reader = CMessageReader.Open(in_stream)
             result.reader.reset(reader.release())
@@ -227,7 +227,7 @@ cdef class _RecordBatchWriter:
         self.closed = True
 
 
-cdef get_input_stream(object source, shared_ptr[InputStream]* out):
+cdef _get_input_stream(object source, shared_ptr[InputStream]* out):
     cdef:
         shared_ptr[RandomAccessFile] file_handle
 
@@ -253,7 +253,7 @@ cdef class _RecordBatchReader:
         pass
 
     def _open(self, source):
-        get_input_stream(source, &self.in_stream)
+        _get_input_stream(source, &self.in_stream)
         with nogil:
             check_status(CRecordBatchStreamReader.Open(
                 self.in_stream.get(), &self.reader))
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 3c2935a..46089de 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -369,6 +369,8 @@ cdef class NativeFile:
     cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
     cdef write_handle(self, shared_ptr[OutputStream]* file)
 
+cdef get_input_stream(object source, c_bool use_memory_map,
+                      shared_ptr[InputStream]* reader)
 cdef get_reader(object source, c_bool use_memory_map,
                 shared_ptr[RandomAccessFile]* reader)
 cdef get_writer(object source, shared_ptr[OutputStream]* writer)
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 1a69fb9..e3ea92a 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -15,9 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import gzip
 import io
 import itertools
+import os
+import shutil
 import string
+import tempfile
 import unittest
 
 import pytest
@@ -257,3 +261,30 @@ class TestParallelCSVRead(BaseTestCSVRead, 
unittest.TestCase):
         table = read_csv(*args, **kwargs)
         table._validate()
         return table
+
+
+class BaseTestCompressedCSVRead:
+
+    def setUp(self):
+        self.tmpdir = tempfile.mkdtemp(prefix='arrow-csv-test-')
+
+    def tearDown(self):
+        shutil.rmtree(self.tmpdir)
+
+    def test_random_csv(self):
+        csv, expected = make_random_csv(num_cols=2, num_rows=100)
+        csv_path = os.path.join(self.tmpdir, self.csv_filename)
+        self.write_file(csv_path, csv)
+        table = read_csv(csv_path)
+        table._validate()
+        assert table.schema == expected.schema
+        assert table.equals(expected)
+        assert table.to_pydict() == expected.to_pydict()
+
+
+class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase):
+    csv_filename = "compressed.csv.gz"
+
+    def write_file(self, path, contents):
+        with gzip.open(path, 'wb', 3) as f:
+            f.write(contents)

Reply via email to