This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2d7fbb2 ARROW-3380: [Python] Support reading gzipped CSV files
2d7fbb2 is described below
commit 2d7fbb255db973280c0abce2693441df31b1c9e1
Author: Antoine Pitrou <[email protected]>
AuthorDate: Sat Oct 20 10:21:18 2018 -0400
ARROW-3380: [Python] Support reading gzipped CSV files
Also works for other bundled compression types.
Based on PR #2777.
Author: Antoine Pitrou <[email protected]>
Closes #2786 from pitrou/ARROW-3380-gzipped-csv-read and squashes the
following commits:
9a2244f84 <Antoine Pitrou> ARROW-3380: Support reading gzipped CSV files
---
cpp/src/arrow/io/api.h | 1 +
python/pyarrow/_csv.pyx | 6 ++---
python/pyarrow/includes/libarrow.pxd | 10 ++++++++
python/pyarrow/io.pxi | 44 ++++++++++++++++++++++++++++++++++++
python/pyarrow/ipc.pxi | 6 ++---
python/pyarrow/lib.pxd | 2 ++
python/pyarrow/tests/test_csv.py | 31 +++++++++++++++++++++++++
7 files changed, 93 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/io/api.h b/cpp/src/arrow/io/api.h
index 9f26d95..0d5742a 100644
--- a/cpp/src/arrow/io/api.h
+++ b/cpp/src/arrow/io/api.h
@@ -18,6 +18,7 @@
#ifndef ARROW_IO_API_H
#define ARROW_IO_API_H
+#include "arrow/io/compressed.h"
#include "arrow/io/file.h"
#include "arrow/io/hdfs.h"
#include "arrow/io/interfaces.h"
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index cc69862..8005f44 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -22,7 +22,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (check_status, MemoryPool, maybe_unbox_memory_pool,
- pyarrow_wrap_table, get_reader)
+ pyarrow_wrap_table, get_input_stream)
cdef unsigned char _single_char(s) except 0:
@@ -149,10 +149,8 @@ cdef class ParseOptions:
cdef _get_reader(input_file, shared_ptr[InputStream]* out):
- cdef shared_ptr[RandomAccessFile] result
use_memory_map = False
- get_reader(input_file, use_memory_map, &result)
- out[0] = <shared_ptr[InputStream]> result
+ get_input_stream(input_file, use_memory_map, out)
cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 2a69446..c53b4ad 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -663,6 +663,16 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io"
nogil:
int file_descriptor()
+ cdef cppclass CompressedInputStream(InputStream):
+ @staticmethod
+ CStatus Make(CMemoryPool* pool, CCodec* codec,
+ shared_ptr[InputStream] raw,
+ shared_ptr[CompressedInputStream]* out)
+
+ @staticmethod
+ CStatus Make(CCodec* codec, shared_ptr[InputStream] raw,
+ shared_ptr[CompressedInputStream]* out)
+
# ----------------------------------------------------------------------
# HDFS
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 2e934da..ca14346 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1055,6 +1055,39 @@ cdef get_reader(object source, c_bool use_memory_map,
.format(type(source)))
+cdef get_input_stream(object source, c_bool use_memory_map,
+ shared_ptr[InputStream]* out):
+ """
+ Like get_reader(), but can automatically decompress, and returns
+ an InputStream.
+ """
+ cdef:
+ NativeFile nf
+ shared_ptr[RandomAccessFile] random_access_file
+ shared_ptr[InputStream] input_stream
+ shared_ptr[CompressedInputStream] compressed_stream
+ CompressionType compression_type = CompressionType_UNCOMPRESSED
+ unique_ptr[CCodec] codec
+
+ try:
+ source_path = _stringify_path(source)
+ except TypeError:
+ pass
+ else:
+ compression_type = _get_compression_type_by_filename(source_path)
+
+ get_reader(source, use_memory_map, &random_access_file)
+ input_stream = <shared_ptr[InputStream]> random_access_file
+
+ if compression_type != CompressionType_UNCOMPRESSED:
+ check_status(CCodec.Create(compression_type, &codec))
+ check_status(CompressedInputStream.Make(codec.get(), input_stream,
+ &compressed_stream))
+ input_stream = <shared_ptr[InputStream]> compressed_stream
+
+ out[0] = input_stream
+
+
cdef get_writer(object source, shared_ptr[OutputStream]* writer):
cdef NativeFile nf
@@ -1099,6 +1132,17 @@ cdef CompressionType _get_compression_type(object name):
.format(str(name)))
+cdef CompressionType _get_compression_type_by_filename(str filename):
+ if filename.endswith('.gz'):
+ return CompressionType_GZIP
+ elif filename.endswith('.lz4'):
+ return CompressionType_LZ4
+ elif filename.endswith('.zst'):
+ return CompressionType_ZSTD
+ else:
+ return CompressionType_UNCOMPRESSED
+
+
def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
"""
Compress pyarrow.Buffer or Python object supporting the buffer (memoryview)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 0331fba..a65b7f7 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -118,7 +118,7 @@ cdef class MessageReader:
cdef MessageReader result = MessageReader.__new__(MessageReader)
cdef shared_ptr[InputStream] in_stream
cdef unique_ptr[CMessageReader] reader
- get_input_stream(source, &in_stream)
+ _get_input_stream(source, &in_stream)
with nogil:
reader = CMessageReader.Open(in_stream)
result.reader.reset(reader.release())
@@ -227,7 +227,7 @@ cdef class _RecordBatchWriter:
self.closed = True
-cdef get_input_stream(object source, shared_ptr[InputStream]* out):
+cdef _get_input_stream(object source, shared_ptr[InputStream]* out):
cdef:
shared_ptr[RandomAccessFile] file_handle
@@ -253,7 +253,7 @@ cdef class _RecordBatchReader:
pass
def _open(self, source):
- get_input_stream(source, &self.in_stream)
+ _get_input_stream(source, &self.in_stream)
with nogil:
check_status(CRecordBatchStreamReader.Open(
self.in_stream.get(), &self.reader))
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 3c2935a..46089de 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -369,6 +369,8 @@ cdef class NativeFile:
cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
cdef write_handle(self, shared_ptr[OutputStream]* file)
+cdef get_input_stream(object source, c_bool use_memory_map,
+ shared_ptr[InputStream]* reader)
cdef get_reader(object source, c_bool use_memory_map,
shared_ptr[RandomAccessFile]* reader)
cdef get_writer(object source, shared_ptr[OutputStream]* writer)
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 1a69fb9..e3ea92a 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -15,9 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+import gzip
import io
import itertools
+import os
+import shutil
import string
+import tempfile
import unittest
import pytest
@@ -257,3 +261,30 @@ class TestParallelCSVRead(BaseTestCSVRead,
unittest.TestCase):
table = read_csv(*args, **kwargs)
table._validate()
return table
+
+
+class BaseTestCompressedCSVRead:
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix='arrow-csv-test-')
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_random_csv(self):
+ csv, expected = make_random_csv(num_cols=2, num_rows=100)
+ csv_path = os.path.join(self.tmpdir, self.csv_filename)
+ self.write_file(csv_path, csv)
+ table = read_csv(csv_path)
+ table._validate()
+ assert table.schema == expected.schema
+ assert table.equals(expected)
+ assert table.to_pydict() == expected.to_pydict()
+
+
+class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase):
+ csv_filename = "compressed.csv.gz"
+
+ def write_file(self, path, contents):
+ with gzip.open(path, 'wb', 3) as f:
+ f.write(contents)