Repository: arrow
Updated Branches:
  refs/heads/master ec3261782 -> 5aea3a3d9


ARROW-1287: [Python] Implement whence argument for pyarrow.NativeFile.seek

I still need to validate this against the use case in 
https://github.com/dask/fastparquet/issues/188

Author: Wes McKinney <[email protected]>

Closes #907 from wesm/ARROW-1287 and squashes the following commits:

933f3f6d [Wes McKinney] Add testing script for checking thirdparty library 
against pyarrow.HdfsClient
423ca87b [Wes McKinney] Implement whence argument for pyarrow.NativeFile.seek


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5aea3a3d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5aea3a3d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5aea3a3d

Branch: refs/heads/master
Commit: 5aea3a3d9340928e2d9f19c7fce4dcd4688dbee1
Parents: ec32617
Author: Wes McKinney <[email protected]>
Authored: Sat Jul 29 10:48:44 2017 -0400
Committer: Wes McKinney <[email protected]>
Committed: Sat Jul 29 10:48:44 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/io.pxi               | 58 ++++++++++++++++++++++++++++++--
 python/pyarrow/tests/conftest.py    |  4 ++-
 python/pyarrow/tests/test_io.py     |  9 +++++
 python/pyarrow/tests/test_plasma.py | 15 ++++-----
 python/testing/parquet_interop.py   | 53 +++++++++++++++++++++++++++++
 5 files changed, 127 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/5aea3a3d/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 181b0b1..9e4e907 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -106,6 +106,9 @@ cdef class NativeFile:
             raise IOError("file not open")
 
     def size(self):
+        """
+        Return file size
+        """
         cdef int64_t size
         self._assert_readable()
         with nogil:
@@ -113,6 +116,9 @@ cdef class NativeFile:
         return size
 
     def tell(self):
+        """
+        Return current stream position
+        """
         cdef int64_t position
         with nogil:
             if self.is_readable:
@@ -121,10 +127,46 @@ cdef class NativeFile:
                 check_status(self.wr_file.get().Tell(&position))
         return position
 
-    def seek(self, int64_t position):
+    def seek(self, int64_t position, int whence=0):
+        """
+        Change current file stream position
+
+        Parameters
+        ----------
+        position : int
+            Byte offset, interpreted relative to value of whence argument
+        whence : int, default 0
+            Point of reference for seek offset
+
+        Notes
+        -----
+        Values of whence:
+        * 0 -- start of stream (the default); offset should be zero or positive
+        * 1 -- current stream position; offset may be negative
+        * 2 -- end of stream; offset is usually negative
+
+        Returns
+        -------
+        new_position : the new absolute stream position
+        """
+        cdef int64_t offset
         self._assert_readable()
         with nogil:
-            check_status(self.rd_file.get().Seek(position))
+            if whence == 0:
+                offset = position
+            elif whence == 1:
+                check_status(self.rd_file.get().Tell(&offset))
+                offset = offset + position
+            elif whence == 2:
+                check_status(self.rd_file.get().GetSize(&offset))
+                offset = offset + position
+            else:
+                with gil:
+                    raise ValueError("Invalid value of whence: {0}"
+                                     .format(whence))
+            check_status(self.rd_file.get().Seek(offset))
+
+        return self.tell()
 
     def write(self, data):
         """
@@ -144,6 +186,18 @@ cdef class NativeFile:
             check_status(self.wr_file.get().Write(buf, bufsize))
 
     def read(self, nbytes=None):
+        """
+        Read indicated number of bytes from file, or read all remaining bytes
+        if no argument passed
+
+        Parameters
+        ----------
+        nbytes : int, default None
+
+        Returns
+        -------
+        data : bytes
+        """
         cdef:
             int64_t c_nbytes
             int64_t bytes_read = 0

http://git-wip-us.apache.org/repos/asf/arrow/blob/5aea3a3d/python/pyarrow/tests/conftest.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index 21288e4..f2d67f6 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -33,12 +33,14 @@ try:
 except ImportError:
     pass
 
+
 try:
-    import pyarrow.plasma as plasma
+    import pyarrow.plasma as plasma  # noqa
     defaults['plasma'] = True
 except ImportError:
     pass
 
+
 def pytest_configure(config):
     pass
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5aea3a3d/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 835f508..c81a048 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -323,6 +323,15 @@ def _check_native_file_reader(FACTORY, sample_data):
     assert f.tell() == len(data) + 1
     assert f.read(5) == b''
 
+    # Test whence argument of seek, ARROW-1287
+    assert f.seek(3) == 3
+    assert f.seek(3, os.SEEK_CUR) == 6
+    assert f.tell() == 6
+
+    ex_length = len(data) - 2
+    assert f.seek(-2, os.SEEK_END) == ex_length
+    assert f.tell() == ex_length
+
 
 def test_memory_map_reader(sample_disk_data):
     _check_native_file_reader(pa.memory_map, sample_disk_data)

http://git-wip-us.apache.org/repos/asf/arrow/blob/5aea3a3d/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 8f8d5b5..e168d9f 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,22 +19,20 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
-import glob
 import numpy as np
 import os
 import pytest
 import random
 import signal
 import subprocess
-import sys
 import time
-import unittest
 
 import pyarrow as pa
 import pandas as pd
 
 DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
 
+
 def random_name():
     return str(random.randint(0, 99999999))
 
@@ -160,7 +158,7 @@ class TestPlasmaClient(object):
 
     def teardown_method(self, test_method):
         # Check that the Plasma store is still alive.
-        assert self.p.poll() == None
+        assert self.p.poll() is None
         # Kill the plasma store process.
         if os.getenv("PLASMA_VALGRIND") == "1":
             self.p.send_signal(signal.SIGTERM)
@@ -227,7 +225,7 @@ class TestPlasmaClient(object):
                 self.plasma_client.create(object_id, length,
                                           generate_metadata(length))
             # TODO(pcm): Introduce a more specific error type here.
-            except pa.lib.ArrowException as e:
+            except pa.lib.ArrowException:
                 pass
             else:
                 assert False
@@ -270,7 +268,6 @@ class TestPlasmaClient(object):
                     assert results[i] is None
 
     def test_store_arrow_objects(self):
-        import pyarrow.plasma as plasma
         data = np.random.randn(10, 4)
         # Write an arrow object.
         object_id = random_object_id()
@@ -334,7 +331,7 @@ class TestPlasmaClient(object):
                                                     partial_size,
                                                     size - partial_size)
             # TODO(pcm): More specific error here.
-            except pa.lib.ArrowException as e:
+            except pa.lib.ArrowException:
                 pass
             else:
                 # For some reason the above didn't throw an exception, so fail.
@@ -368,7 +365,7 @@ class TestPlasmaClient(object):
         fake_object_ids = [random_object_id() for _ in range(100)]
         real_object_ids = [random_object_id() for _ in range(100)]
         for object_id in real_object_ids:
-            assert self.plasma_client.contains(object_id) == False
+            assert self.plasma_client.contains(object_id) is False
             self.plasma_client.create(object_id, 100)
             self.plasma_client.seal(object_id)
             assert self.plasma_client.contains(object_id)
@@ -383,7 +380,7 @@ class TestPlasmaClient(object):
         try:
             self.plasma_client.hash(object_id1)
             # TODO(pcm): Introduce a more specific error type here
-        except pa.lib.ArrowException as e:
+        except pa.lib.ArrowException:
             pass
         else:
             assert False

http://git-wip-us.apache.org/repos/asf/arrow/blob/5aea3a3d/python/testing/parquet_interop.py
----------------------------------------------------------------------
diff --git a/python/testing/parquet_interop.py 
b/python/testing/parquet_interop.py
new file mode 100644
index 0000000..ba2eb6f
--- /dev/null
+++ b/python/testing/parquet_interop.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+
+import fastparquet
+import pandas as pd
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pandas.util.testing as tm
+
+
+def hdfs_test_client(driver='libhdfs'):
+    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
+    user = os.environ['ARROW_HDFS_TEST_USER']
+    try:
+        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
+    except ValueError:
+        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
+                         'an integer')
+
+    return pa.HdfsClient(host, port, user, driver=driver)
+
+
+def test_fastparquet_read_with_hdfs():
+    fs = hdfs_test_client()
+
+    df = tm.makeDataFrame()
+    table = pa.Table.from_pandas(df)
+
+    path = '/tmp/testing.parquet'
+    with fs.open(path, 'wb') as f:
+        pq.write_table(table, f)
+
+    parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)
+
+    result = parquet_file.to_pandas()
+    tm.assert_frame_equal(result, df)

Reply via email to