Hi, Am attaching an updated version.
@Andreas: the floating pt ops are not part of the code anymore. Some code beautification and consistent nomenclature. Regards Nithin On 22 February 2011 15:17, nithin s <nithin19...@gmail.com> wrote: > Hi Andreas > > On 22 February 2011 05:45, Andreas Kloeckner <li...@informa.tiker.net> wrote: >> Hi Nithin, >> >> two requests: >> >> - can you please resend this as an attachment? It's hard to fish out of >> the text of an email. > Done >> >> - please avoid using floating point functions (log, ceil, floor) in >> integer contexts. PyCUDA comes with a bitlog2 function that does what >> you need, I think. > > bitlog2 alone doesn't cut it. This is becase the log is taken to the > base 2*block_size. block_size need not be a power of 2 in a few rare > cases. This is because if shared mem is limited then the block_size = > shared_mem/item_size. Now Item size need not be a power of 2 (If we > are willing to support arbitrary types.. though there is a limitation > .. since dtype needs to be known for partial sum array > allocations..which is presumably numpy.dtype). > > This will mess up the estimate. I could recode this by writing a > routine by repeatedly dividing and calculating the necessary int ciel. > I feel the current expression is cleaner and concise. Let me know if > you still feel otherwise. > >> >> Once I get the file posted on the PyCUDA branch, I'll write a more >> complete review. I agree with your assessment of inclusive vs exclusive >> scan. I'd say go ahead and kill the inclusive version. >> >> Tomasz, what's your take here? >> >> Andreas >> >> > > Regards > > Nithin > > > @Bryan: Tomaszs' original inclusive scan impl was based on the naive > prefix scan algorithm at http://en.wikipedia.org/wiki/Prefix_sum. This > is not particularly work efficient. I don't (yet) see a neat way to > convert the Exclusive Mark Harris version to an inclusive one.Thus I > thought it better to maintain a single exclusive prefix scan. >
from pycuda import driver, compiler, gpuarray, tools, reduction from warnings import warn exclusive_scan_source = """ #define SUM(a, b) (%(scan_operation)s) %(preamble)s __global__ void %(name)s(const %(data_type)s *i_data,%(data_type)s *o_data %(if_part_sum)s ,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int full_array_size = n; %(if_tail)s const int full_array_size = 1<<((int)(floor(log2((float)n))+1)); %(if_main)s const int block_index = blockIdx.x; const int block_offset = 2*block_index*blockDim.x; int tree_node_distance = 1; int target_node_index = threadIdx.x + (n/2); %(if_tail)s if (source_node_index < n) { shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = i_data[source_node_index + block_offset]; } %(if_tail)s else {shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;} %(if_tail)s if (target_node_index < n) { shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = i_data[target_node_index + block_offset]; } %(if_tail)s else {shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;} // Travel upwards, from leaves to the root of the tree // During each loop time distance between nodes increases two times, // and number of nodes decreases two times for (int number_of_nodes = full_array_size>>1; number_of_nodes > 0; number_of_nodes >>= 1) { __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); shared_array[target_node_index] = SUM(shared_array[target_node_index], shared_array[source_node_index]); } tree_node_distance <<= 1; } if (threadIdx.x == 0) { %(if_part_sum)s partial_sum[block_index] = shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)]; shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s; } // Travel downwards, from root to the leaves // During each loop number of nodes increases two times and // distance between nodes decreases two times for (int number_of_nodes = 1; number_of_nodes < full_array_size; number_of_nodes <<= 1) { tree_node_distance >>= 1; __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); %(data_type)s temp = shared_array[source_node_index]; shared_array[source_node_index] = shared_array[target_node_index]; shared_array[target_node_index] = SUM(shared_array[target_node_index], temp); } } __syncthreads(); %(if_tail)s if (source_node_index < n) { o_data[source_node_index + block_offset] = shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)]; } %(if_tail)s if (target_node_index < n) { o_data[target_node_index + block_offset] = shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)]; } } """ uniform_sum_source = """ #define SUM(a, b) (%(scan_operation)s) %(preamble)s __global__ void %(name)s(%(data_type)s *o_data,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int block_index = blockIdx.x+1; const int block_offset = 2*block_index*blockDim.x; int target_node_index = threadIdx.x + (n/2); if (threadIdx.x == 0) { shared_array[0] = partial_sum[block_index]; } __syncthreads(); // is this a bad call?? %(data_type)s prev_block_sum = shared_array[0]; %(if_tail)s if (source_node_index < n) { o_data[source_node_index + block_offset] = SUM(prev_block_sum,o_data[source_node_index + block_offset]); } %(if_tail)s if (target_node_index < n) { o_data[target_node_index + block_offset] = SUM(prev_block_sum,o_data[target_node_index + block_offset]); } } """ def _div_ceil(nr, dr): return int(int(nr + dr -1)/int(dr)) def get_num_levels(n,c): l = 0 while(n != 1): l += 1 n = _div_ceil(n,c) return l def get_num_chunks(n,c,i): return _div_ceil(n,pow(c,i)) # TODO: check this four .. is it future proof?? padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n" #padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n" class ExclusivePrefixKernel(object): def __init__(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): # Determine the size of used data type self.numpy_type = tools.parse_c_arg("const %s * in" % data_type).dtype item_size = self.numpy_type.itemsize # Determine the number of threads dev = driver.Context.get_device() block_size = dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK) block_dimension = dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X) self.block_size = min(block_size, block_dimension) # Shared memory size: two items per thread, number of threads, # size of one data item self.shared_size = self.get_chunk_size()*item_size # Padding to avoid bank conflicts # TODO: is this always 4?? log_num_banks = 4 self.shared_size += ((self.get_chunk_size() >> log_num_banks) * item_size) # Check whether we do not exceed available shared memory size max_shared_size = dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK) if self.shared_size > max_shared_size: warn ("Changed shared size") self.shared_size = max_shared_size self.block_size = self.shared_size//(2*item_size) self.main_function = self.get_main_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_function.prepare("PPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_function = self.get_tail_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_function.prepare("PPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_part_sum_function = self.get_main_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_part_sum_function.prepare("PPPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_part_sum_function = self.get_tail_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_part_sum_function.prepare("PPPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_uniform_sum_function = self.get_main_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_uniform_sum_function.prepare("PPi", block=(self.block_size, 1, 1), shared=item_size) self.tail_uniform_sum_function = self.get_tail_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_uniform_sum_function.prepare("PPii", block=(self.block_size, 1, 1), shared=item_size) # Use maximum available shared memory in 2.x devices # TODO: is it needed as we are more limited by number of threads? # Might be needed for large data types (?) if dev.compute_capability() >= (2, 0): cache_size = pycuda.driver.func_cache.PREFER_SHARED self.main_function.set_cache_config(cache_size) self.tail_function.set_cache_config(cache_size) self.main_part_sum_function.set_cache_config(cache_size) self.tail_part_sum_function.set_cache_config(cache_size) def get_chunk_size(self): return 2 * self.block_size def get_main_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_main_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_main_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def call_final(self,input_size,i_data,o_data): block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size())) if input_size != block_count * self.get_chunk_size(): if block_count > 1: self.main_function.prepared_call((block_count-1, 1), i_data.gpudata, o_data.gpudata, self.get_chunk_size()) self.tail_function.prepared_call((1, 1), i_data.gpudata, o_data.gpudata, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: self.main_function.prepared_call((block_count, 1), i_data.gpudata, o_data.gpudata, self.get_chunk_size()) def call_intermediate(self,input_size,i_data,o_data,part_sum_buf): block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size())) if input_size != block_count * self.get_chunk_size(): if block_count > 1: self.main_part_sum_function.prepared_call((block_count-1, 1), i_data.gpudata, o_data.gpudata,part_sum_buf.gpudata, self.get_chunk_size()) self.tail_part_sum_function.prepared_call((1, 1), i_data.gpudata, o_data.gpudata,part_sum_buf.gpudata, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: self.main_part_sum_function.prepared_call((block_count, 1), i_data.gpudata, o_data.gpudata,part_sum_buf.gpudata, self.get_chunk_size()) def call_uniform_add(self,input_size,i_data,o_data,part_sum_buf): block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size())) if block_count <= 1: raise Exception("This should not have happened .. nothing to be uniform added") if input_size != block_count * self.get_chunk_size(): block_count -= 1 if block_count > 1: self.main_uniform_sum_function.prepared_call((block_count-1, 1), o_data.gpudata, part_sum_buf.gpudata, self.get_chunk_size()) block_count += 1 self.tail_uniform_sum_function.prepared_call((1, 1), o_data.gpudata,part_sum_buf.gpudata, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: block_count -= 1 self.main_uniform_sum_function.prepared_call((block_count, 1), o_data.gpudata, part_sum_buf.gpudata, self.get_chunk_size()) def __call__(self, *args, **kwargs): i_data = kwargs.get('i_data') if i_data is None: i_data = args[0] o_data = kwargs.get('o_data') if o_data is None: o_data = args[1] n = kwargs.get('n') if n is None: n = min(i_data.size,o_data.size) part_sum_bufs = [ gpuarray.empty(get_num_chunks(n,self.get_chunk_size(),l),o_data.dtype) for l in range(1,get_num_levels(n,self.get_chunk_size())) ] callargsets = [ [n,i_data,o_data] ] + [ [ps_buf.size,ps_buf,ps_buf] for ps_buf in part_sum_bufs] for i,ps_buf in enumerate(part_sum_bufs): callargsets[i] += [ps_buf] for callargset in callargsets[0:-1]: self.call_intermediate(*callargset) self.call_final(*callargsets[-1]) for callargset in reversed(callargsets[0:-1]): self.call_uniform_add(*callargset) if __name__=="__main__": import time import pycuda.autoinit import numpy as np sz = 1024*1024*3 - 2 a = gpuarray.empty(sz, dtype=np.int32) b = a a.fill(np.int32(1)) krnl = ExclusivePrefixKernel('int', 'a+b', '0') time_start = time.time() krnl(a, b, sz) print "kernel finished in %f seconds "%(time.time() - time_start) print "a = ",a.get() print "b = ",b.get()
_______________________________________________ PyCUDA mailing list PyCUDA@tiker.net http://lists.tiker.net/listinfo/pycuda