Hello all,

Attached is code (plus tests) for allocating aligned arrays -- I think this addresses all the requests in this thread, with regard to allowing for different kinds of alignment. Thanks Robert and Anne for your help and suggestions. Hopefully this will be useful.

The core is a function for allocating arrays with totally arbitrary alignment along each dimension (e.g. you could allocate an 10x20 array of uint16's where each uint16 is aligned to 4-byte boundaries and each row of 20 uint16's is aligned to 32-byte boundaries, and the entire buffer is aligned to a 128-byte boundary.) I've also included helper functions for two common cases: when you want everything aligned to a particular multiple (every element, row, etc. as well as the whole buffer), and when you want an array where the rows (second-fastest moving index) are so aligned (this was my original use case, for fast image-blitting).

Zach


def aligned_empty(shape, dtype, dim_alignments, array_alignment):
'''Allocate an empty array with the given shape and dtype, where the array buffer starts at a memory address evenly-divisible by array_alignment, and where items along each dimension are offset from the first item on that dimension by a byte offset that is an integer multiple of the corresponding
  value in the dim_alignments tuple.

Example: To allocate a 20x30 array of floats32s, where individual data elements are aligned to 16-bute boundaries, each row is aligned to a 64-byte
  boundary, and the array's buffer starts on a 128-byte boundary, call:
    aligned_empty((20,30), numpy.float32, (64, 16), 128)
  '''

def aligned_rows_empty(shape, dtype, alignment, order='C'):
'''Return an array where the rows (second-fastest-moving index) are aligned to byte boundaries evenly-divisible by 'alignment'. If 'order' is 'C', then the indexing is such that the fastest-moving index is the last one; if the
  order is 'F', then the fastest-moving index is the first.'''

def aligned_elements_empty(shape, dtype, alignment, order='C'):
'''Return an array where each element is aligned to byte boundaries evenly-
  divisible by 'alignment'.'''


import numpy

def aligned_empty(shape, dtype, dim_alignments, array_alignment):
  '''Allocate an empty array with the given shape and dtype, where the array
  buffer starts at a memory address evenly-divisible by array_alignment, and
  where items along each dimension are offset from the first item on that
  dimension by a byte offset that is an integer multiple of the corresponding
  value in the dim_alignments tuple.
  
  Example: To allocate a 20x30 array of floats32s, where individual data
  elements are aligned to 16-bute boundaries, each row is aligned to a 64-byte
  boundary, and the array's buffer starts on a 128-byte boundary, call:
    aligned_empty((20,30), numpy.float32, (64, 16), 128)
  '''
  if len(shape) != len(dim_alignments):
    raise ValueError('Alignments must be provided for each dimension.')
  dtype = numpy.dtype(dtype)
  strides = []
  current_size = dtype.itemsize
  for width, alignment in zip(shape[::-1], dim_alignments[::-1]):
    # build up new strides array in reverse, so that the fastest-moving index
    # is the last (C-ish indexing, but not necessarily contiguous)
    current_size += (alignment - current_size % alignment) % alignment
    strides.append(current_size)
    current_size *= width
  strides = strides[::-1]
  total_bytes = current_size + (array_alignment - 1)
  buffer = numpy.empty(total_bytes, dtype=numpy.uint8)
  address = buffer.ctypes.data
  offset = (array_alignment - address % array_alignment) % array_alignment
  return numpy.ndarray(shape=shape, dtype=dtype, buffer=buffer,
    strides=strides, offset=offset)

def aligned_rows_empty(shape, dtype, alignment, order='C'):
  '''Return an array where the rows (second-fastest-moving index) are aligned
  to byte boundaries evenly-divisible by 'alignment'. If 'order' is 'C', then
  the indexing is such that the fastest-moving index is the last one; if the
  order is 'F', then the fastest-moving index is the first.'''
  if len(shape) < 2:
    raise ValueError('Need at least a 2D array to align rows.')
  order = order.upper()
  if order not in ('C', 'F'):
    raise ValueError("Order must be 'C' or 'F'.")
  dim_alignments = [1 for dim in shape]
  dim_alignments[-2] = alignment
  if order == 'F':
    shape = shape[::-1]
    return aligned_empty(shape, dtype, dim_alignments, alignment).T
  else:
    return aligned_empty(shape, dtype, dim_alignments, alignment)

def aligned_elements_empty(shape, dtype, alignment, order='C'):
  '''Return an array where each element is aligned to byte boundaries evenly-
  divisible by 'alignment'.'''
  order = order.upper()
  if order not in ('C', 'F'):
    raise ValueError("Order must be 'C' or 'F'.")
  dim_alignments = [alignment for dim in shape]
  if order == 'F':
    shape = shape[::-1]
    return aligned_empty(shape, dtype, dim_alignments, alignment).T
  else:
    return aligned_empty(shape, dtype, dim_alignments, alignment)


def _address(element):
  return element.__array_interface__['data'][0]

def _test_alignment(array, dim_alignments, array_alignment):
  base_address = _address(array)
  assert(base_address % array_alignment == 0)
  _test_alignment_recursive(array, dim_alignments, base_address)
  
def _test_alignment_recursive(array, dim_alignments, base_address):
  for i in range(len(array)):
    if len(array.shape) == 1:
      elem = array[i:i+1] # slice so that we never get an array scalar
    else:
      elem = array[i]
    assert((_address(elem) - base_address) % dim_alignments[0] == 0)
    if len(array.shape) > 1:
      _test_alignment_recursive(elem, dim_alignments[1:], _address(elem))

def test():
  array = aligned_empty((13,71,55), numpy.uint16, (7,9,68), 133)
  _test_alignment(array, (7,9,68), 133)
  array = aligned_empty((10,20), numpy.uint8, (64,16), 256)
  _test_alignment(array, (64,16), 256)
  aligned_elements = aligned_elements_empty((100,), numpy.float32, 7)
  for i in range(100):
    assert(_address(aligned_elements[i:i+1]) % 7 == 0)
  aligned_elements = aligned_elements_empty((10,7,12), numpy.float64, 128)
  for i,j,k in numpy.ndindex(10,7,12):
    assert(_address(aligned_elements[i,j,k:k+1]) % 128 == 0)
  aligned_rows = aligned_rows_empty((100,200), numpy.uint16, 4, 'F')
  for i in range(200):
    assert(_address(aligned_rows[:,i]) % 4 == 0)
  aligned_rows = aligned_rows_empty((10,20,100,80), numpy.uint16, 9, 'C')
  for i in range(100):
    assert(_address(aligned_rows[...,i,:]) % 9 == 0)

if __name__ == '__main__':
  test()

_______________________________________________
Numpy-discussion mailing list
Numpy-discussion@scipy.org
http://projects.scipy.org/mailman/listinfo/numpy-discussion

Reply via email to