Hello all,
Attached is code (plus tests) for allocating aligned arrays -- I think
this addresses all the requests in this thread, with regard to
allowing for different kinds of alignment. Thanks Robert and Anne for
your help and suggestions. Hopefully this will be useful.
The core is a function for allocating arrays with totally arbitrary
alignment along each dimension (e.g. you could allocate an 10x20 array
of uint16's where each uint16 is aligned to 4-byte boundaries and each
row of 20 uint16's is aligned to 32-byte boundaries, and the entire
buffer is aligned to a 128-byte boundary.) I've also included helper
functions for two common cases: when you want everything aligned to a
particular multiple (every element, row, etc. as well as the whole
buffer), and when you want an array where the rows (second-fastest
moving index) are so aligned (this was my original use case, for fast
image-blitting).
Zach
def aligned_empty(shape, dtype, dim_alignments, array_alignment):
'''Allocate an empty array with the given shape and dtype, where
the array
buffer starts at a memory address evenly-divisible by
array_alignment, and
where items along each dimension are offset from the first item on
that
dimension by a byte offset that is an integer multiple of the
corresponding
value in the dim_alignments tuple.
Example: To allocate a 20x30 array of floats32s, where individual
data
elements are aligned to 16-bute boundaries, each row is aligned to
a 64-byte
boundary, and the array's buffer starts on a 128-byte boundary, call:
aligned_empty((20,30), numpy.float32, (64, 16), 128)
'''
def aligned_rows_empty(shape, dtype, alignment, order='C'):
'''Return an array where the rows (second-fastest-moving index) are
aligned
to byte boundaries evenly-divisible by 'alignment'. If 'order' is
'C', then
the indexing is such that the fastest-moving index is the last one;
if the
order is 'F', then the fastest-moving index is the first.'''
def aligned_elements_empty(shape, dtype, alignment, order='C'):
'''Return an array where each element is aligned to byte boundaries
evenly-
divisible by 'alignment'.'''
import numpy
def aligned_empty(shape, dtype, dim_alignments, array_alignment):
'''Allocate an empty array with the given shape and dtype, where the array
buffer starts at a memory address evenly-divisible by array_alignment, and
where items along each dimension are offset from the first item on that
dimension by a byte offset that is an integer multiple of the corresponding
value in the dim_alignments tuple.
Example: To allocate a 20x30 array of floats32s, where individual data
elements are aligned to 16-bute boundaries, each row is aligned to a 64-byte
boundary, and the array's buffer starts on a 128-byte boundary, call:
aligned_empty((20,30), numpy.float32, (64, 16), 128)
'''
if len(shape) != len(dim_alignments):
raise ValueError('Alignments must be provided for each dimension.')
dtype = numpy.dtype(dtype)
strides = []
current_size = dtype.itemsize
for width, alignment in zip(shape[::-1], dim_alignments[::-1]):
# build up new strides array in reverse, so that the fastest-moving index
# is the last (C-ish indexing, but not necessarily contiguous)
current_size += (alignment - current_size % alignment) % alignment
strides.append(current_size)
current_size *= width
strides = strides[::-1]
total_bytes = current_size + (array_alignment - 1)
buffer = numpy.empty(total_bytes, dtype=numpy.uint8)
address = buffer.ctypes.data
offset = (array_alignment - address % array_alignment) % array_alignment
return numpy.ndarray(shape=shape, dtype=dtype, buffer=buffer,
strides=strides, offset=offset)
def aligned_rows_empty(shape, dtype, alignment, order='C'):
'''Return an array where the rows (second-fastest-moving index) are aligned
to byte boundaries evenly-divisible by 'alignment'. If 'order' is 'C', then
the indexing is such that the fastest-moving index is the last one; if the
order is 'F', then the fastest-moving index is the first.'''
if len(shape) < 2:
raise ValueError('Need at least a 2D array to align rows.')
order = order.upper()
if order not in ('C', 'F'):
raise ValueError("Order must be 'C' or 'F'.")
dim_alignments = [1 for dim in shape]
dim_alignments[-2] = alignment
if order == 'F':
shape = shape[::-1]
return aligned_empty(shape, dtype, dim_alignments, alignment).T
else:
return aligned_empty(shape, dtype, dim_alignments, alignment)
def aligned_elements_empty(shape, dtype, alignment, order='C'):
'''Return an array where each element is aligned to byte boundaries evenly-
divisible by 'alignment'.'''
order = order.upper()
if order not in ('C', 'F'):
raise ValueError("Order must be 'C' or 'F'.")
dim_alignments = [alignment for dim in shape]
if order == 'F':
shape = shape[::-1]
return aligned_empty(shape, dtype, dim_alignments, alignment).T
else:
return aligned_empty(shape, dtype, dim_alignments, alignment)
def _address(element):
return element.__array_interface__['data'][0]
def _test_alignment(array, dim_alignments, array_alignment):
base_address = _address(array)
assert(base_address % array_alignment == 0)
_test_alignment_recursive(array, dim_alignments, base_address)
def _test_alignment_recursive(array, dim_alignments, base_address):
for i in range(len(array)):
if len(array.shape) == 1:
elem = array[i:i+1] # slice so that we never get an array scalar
else:
elem = array[i]
assert((_address(elem) - base_address) % dim_alignments[0] == 0)
if len(array.shape) > 1:
_test_alignment_recursive(elem, dim_alignments[1:], _address(elem))
def test():
array = aligned_empty((13,71,55), numpy.uint16, (7,9,68), 133)
_test_alignment(array, (7,9,68), 133)
array = aligned_empty((10,20), numpy.uint8, (64,16), 256)
_test_alignment(array, (64,16), 256)
aligned_elements = aligned_elements_empty((100,), numpy.float32, 7)
for i in range(100):
assert(_address(aligned_elements[i:i+1]) % 7 == 0)
aligned_elements = aligned_elements_empty((10,7,12), numpy.float64, 128)
for i,j,k in numpy.ndindex(10,7,12):
assert(_address(aligned_elements[i,j,k:k+1]) % 128 == 0)
aligned_rows = aligned_rows_empty((100,200), numpy.uint16, 4, 'F')
for i in range(200):
assert(_address(aligned_rows[:,i]) % 4 == 0)
aligned_rows = aligned_rows_empty((10,20,100,80), numpy.uint16, 9, 'C')
for i in range(100):
assert(_address(aligned_rows[...,i,:]) % 9 == 0)
if __name__ == '__main__':
test()
_______________________________________________
Numpy-discussion mailing list
Numpy-discussion@scipy.org
http://projects.scipy.org/mailman/listinfo/numpy-discussion