This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c017a63  ARROW-1035: [Python] Add streaming dataframe reconstruction 
benchmark
c017a63 is described below

commit c017a6346f95d58f2ea14b5c2bcd3d7fd7321316
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 27 10:38:39 2018 +0100

    ARROW-1035: [Python] Add streaming dataframe reconstruction benchmark
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #1665 from pitrou/ARROW-1035-streaming-benchmark and squashes the 
following commits:
    
    32b1956 <Antoine Pitrou> ARROW-1035:  Add streaming dataframe 
reconstruction benchmark
---
 python/benchmarks/common.py    | 31 ++++++++++++++-----
 python/benchmarks/streaming.py | 67 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/python/benchmarks/common.py b/python/benchmarks/common.py
index 7dd42fd..b205ba5 100644
--- a/python/benchmarks/common.py
+++ b/python/benchmarks/common.py
@@ -23,13 +23,21 @@ import unicodedata
 import numpy as np
 
 
+KILOBYTE = 1 << 10
+MEGABYTE = KILOBYTE * KILOBYTE
+
+
 def _multiplicate_sequence(base, target_size):
     q, r = divmod(target_size, len(base))
     return [base] * q + [base[:r]]
 
 
-def get_random_bytes(n):
-    rnd = np.random.RandomState(42)
+def get_random_bytes(n, *, seed=42):
+    """
+    Generate a random bytes object of size *n*.
+    Note the result might be compressible.
+    """
+    rnd = np.random.RandomState(seed)
     # Computing a huge random bytestring can be costly, so we get at most
     # 100KB and duplicate the result as needed
     base_size = 100003
@@ -43,22 +51,25 @@ def get_random_bytes(n):
     return result
 
 
-def get_random_ascii(n):
-    arr = np.frombuffer(get_random_bytes(n), dtype=np.int8) & 0x7f
+def get_random_ascii(n, *, seed=42):
+    """
+    Get a random ASCII-only unicode string of size *n*.
+    """
+    arr = np.frombuffer(get_random_bytes(n, seed=seed), dtype=np.int8) & 0x7f
     result, _ = codecs.ascii_decode(arr)
     assert isinstance(result, str)
     assert len(result) == n
     return result
 
 
-def _random_unicode_letters(n):
+def _random_unicode_letters(n, *, seed=42):
     """
     Generate a string of random unicode letters (slow).
     """
     def _get_more_candidates():
         return rnd.randint(0, sys.maxunicode, size=n).tolist()
 
-    rnd = np.random.RandomState(42)
+    rnd = np.random.RandomState(seed)
     out = []
     candidates = []
 
@@ -75,8 +86,12 @@ def _random_unicode_letters(n):
 _1024_random_unicode_letters = _random_unicode_letters(1024)
 
 
-def get_random_unicode(n):
-    indices = np.frombuffer(get_random_bytes(n * 2), dtype=np.int16) & 1023
+def get_random_unicode(n, *, seed=42):
+    """
+    Get a random non-ASCII unicode string of size *n*.
+    """
+    indices = np.frombuffer(get_random_bytes(n * 2, seed=seed),
+                            dtype=np.int16) & 1023
     unicode_arr = np.array(_1024_random_unicode_letters)[indices]
 
     result = ''.join(unicode_arr.tolist())
diff --git a/python/benchmarks/streaming.py b/python/benchmarks/streaming.py
new file mode 100644
index 0000000..be7fda4
--- /dev/null
+++ b/python/benchmarks/streaming.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import pandas as pd
+import pyarrow as pa
+
+from . import common
+from .common import KILOBYTE, MEGABYTE
+
+
+def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')):
+    rowsize = total_size // nchunks // ncols
+    assert rowsize % dtype.itemsize == 0
+    return [pd.DataFrame({
+            'c' + str(col): np.frombuffer(
+                common.get_random_bytes(rowsize, seed=col + 997 * 
chunk)).view(dtype)
+            for col in range(ncols)
+        })
+        for chunk in range(nchunks)]
+
+
+class StreamReader(object):
+    """
+    Benchmark in-memory streaming to a Pandas dataframe.
+    """
+    total_size = 64 * MEGABYTE
+    ncols = 8
+    chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE]
+
+    param_names = ['chunk_size']
+    params = [chunk_sizes]
+
+    def setup(self, chunk_size):
+        # Note we're careful to stream different chunks instead of
+        # streaming N times the same chunk, so that we avoid operating
+        # entirely out of L1/L2.
+        chunks = generate_chunks(self.total_size,
+                                 nchunks=self.total_size // chunk_size,
+                                 ncols=self.ncols)
+        batches = [pa.RecordBatch.from_pandas(df)
+                   for df in chunks]
+        schema = batches[0].schema
+        sink = pa.BufferOutputStream()
+        stream_writer = pa.RecordBatchStreamWriter(sink, schema)
+        for batch in batches:
+            stream_writer.write_batch(batch)
+        self.source = sink.get_result()
+
+    def time_read_to_dataframe(self, *args):
+        reader = pa.RecordBatchStreamReader(self.source)
+        table = reader.read_all()
+        df = table.to_pandas()

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to