This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c017a63 ARROW-1035: [Python] Add streaming dataframe reconstruction
benchmark
c017a63 is described below
commit c017a6346f95d58f2ea14b5c2bcd3d7fd7321316
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 27 10:38:39 2018 +0100
ARROW-1035: [Python] Add streaming dataframe reconstruction benchmark
Author: Antoine Pitrou <[email protected]>
Closes #1665 from pitrou/ARROW-1035-streaming-benchmark and squashes the
following commits:
32b1956 <Antoine Pitrou> ARROW-1035: Add streaming dataframe
reconstruction benchmark
---
python/benchmarks/common.py | 31 ++++++++++++++-----
python/benchmarks/streaming.py | 67 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 90 insertions(+), 8 deletions(-)
diff --git a/python/benchmarks/common.py b/python/benchmarks/common.py
index 7dd42fd..b205ba5 100644
--- a/python/benchmarks/common.py
+++ b/python/benchmarks/common.py
@@ -23,13 +23,21 @@ import unicodedata
import numpy as np
+KILOBYTE = 1 << 10
+MEGABYTE = KILOBYTE * KILOBYTE
+
+
def _multiplicate_sequence(base, target_size):
q, r = divmod(target_size, len(base))
return [base] * q + [base[:r]]
-def get_random_bytes(n):
- rnd = np.random.RandomState(42)
+def get_random_bytes(n, *, seed=42):
+ """
+ Generate a random bytes object of size *n*.
+ Note the result might be compressible.
+ """
+ rnd = np.random.RandomState(seed)
# Computing a huge random bytestring can be costly, so we get at most
# 100KB and duplicate the result as needed
base_size = 100003
@@ -43,22 +51,25 @@ def get_random_bytes(n):
return result
-def get_random_ascii(n):
- arr = np.frombuffer(get_random_bytes(n), dtype=np.int8) & 0x7f
+def get_random_ascii(n, *, seed=42):
+ """
+ Get a random ASCII-only unicode string of size *n*.
+ """
+ arr = np.frombuffer(get_random_bytes(n, seed=seed), dtype=np.int8) & 0x7f
result, _ = codecs.ascii_decode(arr)
assert isinstance(result, str)
assert len(result) == n
return result
-def _random_unicode_letters(n):
+def _random_unicode_letters(n, *, seed=42):
"""
Generate a string of random unicode letters (slow).
"""
def _get_more_candidates():
return rnd.randint(0, sys.maxunicode, size=n).tolist()
- rnd = np.random.RandomState(42)
+ rnd = np.random.RandomState(seed)
out = []
candidates = []
@@ -75,8 +86,12 @@ def _random_unicode_letters(n):
_1024_random_unicode_letters = _random_unicode_letters(1024)
-def get_random_unicode(n):
- indices = np.frombuffer(get_random_bytes(n * 2), dtype=np.int16) & 1023
+def get_random_unicode(n, *, seed=42):
+ """
+ Get a random non-ASCII unicode string of size *n*.
+ """
+ indices = np.frombuffer(get_random_bytes(n * 2, seed=seed),
+ dtype=np.int16) & 1023
unicode_arr = np.array(_1024_random_unicode_letters)[indices]
result = ''.join(unicode_arr.tolist())
diff --git a/python/benchmarks/streaming.py b/python/benchmarks/streaming.py
new file mode 100644
index 0000000..be7fda4
--- /dev/null
+++ b/python/benchmarks/streaming.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import pandas as pd
+import pyarrow as pa
+
+from . import common
+from .common import KILOBYTE, MEGABYTE
+
+
+def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')):
+ rowsize = total_size // nchunks // ncols
+ assert rowsize % dtype.itemsize == 0
+ return [pd.DataFrame({
+ 'c' + str(col): np.frombuffer(
+ common.get_random_bytes(rowsize, seed=col + 997 *
chunk)).view(dtype)
+ for col in range(ncols)
+ })
+ for chunk in range(nchunks)]
+
+
+class StreamReader(object):
+ """
+ Benchmark in-memory streaming to a Pandas dataframe.
+ """
+ total_size = 64 * MEGABYTE
+ ncols = 8
+ chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE]
+
+ param_names = ['chunk_size']
+ params = [chunk_sizes]
+
+ def setup(self, chunk_size):
+ # Note we're careful to stream different chunks instead of
+ # streaming N times the same chunk, so that we avoid operating
+ # entirely out of L1/L2.
+ chunks = generate_chunks(self.total_size,
+ nchunks=self.total_size // chunk_size,
+ ncols=self.ncols)
+ batches = [pa.RecordBatch.from_pandas(df)
+ for df in chunks]
+ schema = batches[0].schema
+ sink = pa.BufferOutputStream()
+ stream_writer = pa.RecordBatchStreamWriter(sink, schema)
+ for batch in batches:
+ stream_writer.write_batch(batch)
+ self.source = sink.get_result()
+
+ def time_read_to_dataframe(self, *args):
+ reader = pa.RecordBatchStreamReader(self.source)
+ table = reader.read_all()
+ df = table.to_pandas()
--
To stop receiving notification emails like this one, please contact
[email protected].