Hi Andreas, I've recoded much of the scan code. Also I've been having problems with my git setup with proxy. So I'm just attaching the source code as is with a sample test code.
I'm of the opinion that a single exclusive scan be part of the implementation. Simply because the inclusive scan is just a step away (by reading and adding the nth element). Regards Nithin from pycuda import driver, compiler, gpuarray, tools, reduction from math import log,ceil,floor import warnings exclusive_scan_source = """ #define SUM(a, b) (%(scan_operation)s) %(preamble)s __global__ void %(name)s(%(data_type)s *result,const %(data_type)s *material %(if_part_sum)s ,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int full_array_size = n; %(if_tail)s const int full_array_size = 1<<((int)(floor(log2((float)n))+1)); %(if_main)s const int block_index = blockIdx.x; const int block_offset = 2*block_index*blockDim.x; int tree_node_distance = 1; int target_node_index = threadIdx.x + (n/2); %(if_tail)s if (source_node_index < n) { shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = material[source_node_index + block_offset]; } %(if_tail)s else {shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;} %(if_tail)s if (target_node_index < n) { shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = material[target_node_index + block_offset]; } %(if_tail)s else {shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;} // Travel upwards, from leaves to the root of the tree // During each loop time distance between nodes increases two times, // and number of nodes decreases two times for (int number_of_nodes = full_array_size>>1; number_of_nodes > 0; number_of_nodes >>= 1) { __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); shared_array[target_node_index] = SUM(shared_array[target_node_index], shared_array[source_node_index]); } tree_node_distance <<= 1; } if (threadIdx.x == 0) { %(if_part_sum)s partial_sum[block_index] = shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)]; shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s; } // Travel downwards, from root to the leaves // During each loop number of nodes increases two times and // distance between nodes decreases two times for (int number_of_nodes = 1; number_of_nodes < full_array_size; number_of_nodes <<= 1) { tree_node_distance >>= 1; __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); %(data_type)s temp = shared_array[source_node_index]; shared_array[source_node_index] = shared_array[target_node_index]; shared_array[target_node_index] = SUM(shared_array[target_node_index], temp); } } __syncthreads(); %(if_tail)s if (source_node_index < n) { result[source_node_index + block_offset] = shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)]; } %(if_tail)s if (target_node_index < n) { result[target_node_index + block_offset] = shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)]; } } """ uniform_sum_source = """ #define SUM(a, b) (%(scan_operation)s) %(preamble)s __global__ void %(name)s(%(data_type)s *result,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int block_index = blockIdx.x+1; const int block_offset = 2*block_index*blockDim.x; int target_node_index = threadIdx.x + (n/2); if (threadIdx.x == 0) { shared_array[0] = partial_sum[block_index]; } __syncthreads(); // is this a bad call?? %(data_type)s prev_block_sum = shared_array[0]; %(if_tail)s if (source_node_index < n) { result[source_node_index + block_offset] = SUM(prev_block_sum,result[source_node_index + block_offset]); } %(if_tail)s if (target_node_index < n) { result[target_node_index + block_offset] = SUM(prev_block_sum,result[target_node_index + block_offset]); } } """ # TODO: check this four .. is it future proof?? padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n" #padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n" class ExclusivePrefixKernel(object): def __init__(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): # Determine the size of used data type self.numpy_type = tools.parse_c_arg("const %s * in" % data_type).dtype item_size = self.numpy_type.itemsize # Determine the number of threads dev = driver.Context.get_device() block_size = dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK) block_dimension = dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X) self.block_size = min(block_size, block_dimension) # Shared memory size: two items per thread, number of threads, # size of one data item self.shared_size = self.get_block_size()*item_size # Padding to avoid bank conflicts # TODO: is this always 4?? log_num_banks = 4 self.shared_size += ((self.get_block_size() >> log_num_banks) * item_size) # Check whether we do not exceed available shared memory size max_shared_size = dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK) if self.shared_size > max_shared_size: warnings.warn ("Changed shared size") self.shared_size = max_shared_size self.block_size = self.shared_size//(2*item_size) self.main_function = self.get_main_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_function.prepare("PPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_function = self.get_tail_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_function.prepare("PPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_part_sum_function = self.get_main_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_part_sum_function.prepare("PPPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_part_sum_function = self.get_tail_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_part_sum_function.prepare("PPPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_uniform_sum_function = self.get_main_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_uniform_sum_function.prepare("PPi", block=(self.block_size, 1, 1), shared=item_size) self.tail_uniform_sum_function = self.get_tail_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_uniform_sum_function.prepare("PPii", block=(self.block_size, 1, 1), shared=item_size) # Use maximum available shared memory in 2.x devices # TODO: is it needed as we are more limited by number of threads? # Might be needed for large data types (?) if dev.compute_capability() >= (2, 0): cache_size = pycuda.driver.func_cache.PREFER_SHARED self.main_function.set_cache_config(cache_size) self.tail_function.set_cache_config(cache_size) self.main_part_sum_function.set_cache_config(cache_size) self.tail_part_sum_function.set_cache_config(cache_size) def get_block_size(self): return 2 * self.block_size def get_main_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_main_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_main_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", } return compiler.SourceModule(src, options=options, keep=keep).get_function(name) def call_final(self,input_size,result,material): block_count = ((input_size + self.get_block_size() - 1)/(self.get_block_size())) if input_size != block_count * self.get_block_size(): if block_count > 1: self.main_function.prepared_call((block_count-1, 1), result.gpudata, material.gpudata, self.get_block_size()) self.tail_function.prepared_call((1, 1), result.gpudata, material.gpudata, input_size - (block_count - 1) * self.get_block_size(), (block_count - 1)) else: self.main_function.prepared_call((block_count, 1), result.gpudata, material.gpudata, self.get_block_size()) def call_intermediate(self,input_size,result,material,part_sum_buf): block_count = ((input_size + self.get_block_size() - 1)/(self.get_block_size())) if input_size != block_count * self.get_block_size(): if block_count > 1: self.main_part_sum_function.prepared_call((block_count-1, 1), result.gpudata, material.gpudata,part_sum_buf.gpudata, self.get_block_size()) self.tail_part_sum_function.prepared_call((1, 1), result.gpudata, material.gpudata,part_sum_buf.gpudata, input_size - (block_count - 1) * self.get_block_size(), (block_count - 1)) else: self.main_part_sum_function.prepared_call((block_count, 1), result.gpudata, material.gpudata,part_sum_buf.gpudata, self.get_block_size()) def call_uniform_add(self,input_size,result,material,part_sum_buf): block_count = ((input_size + self.get_block_size() - 1)/(self.get_block_size())) if block_count <= 1: raise Exception("This should not have happened .. nothing to be uniform added") if input_size != block_count * self.get_block_size(): block_count -= 1 if block_count > 1: self.main_uniform_sum_function.prepared_call((block_count-1, 1), result.gpudata, part_sum_buf.gpudata, self.get_block_size()) block_count += 1 self.tail_uniform_sum_function.prepared_call((1, 1), result.gpudata,part_sum_buf.gpudata, input_size - (block_count - 1) * self.get_block_size(), (block_count - 1)) else: block_count -= 1 self.main_uniform_sum_function.prepared_call((block_count, 1), result.gpudata, part_sum_buf.gpudata, self.get_block_size()) def __call__(self, *args, **kwargs): result = kwargs.get('result') if result is None: result = args[0] material = kwargs.get('material') if material is None: material = args[1] input_size = kwargs.get('size') if input_size is None: input_size = material.size part_sum_bufs = [ gpuarray.empty(int(ceil(float(input_size)/pow(2*self.block_size,i))),result.dtype) for i in range(1,int(ceil(log(input_size,2*self.block_size)))) ] callargsets = [ [input_size,result,material] ] + [ [ps_buf.size,ps_buf,ps_buf] for ps_buf in part_sum_bufs] for i,ps_buf in enumerate(part_sum_bufs): callargsets[i] += [ps_buf] for callargset in callargsets[0:-1]: self.call_intermediate(*callargset) self.call_final(*callargsets[-1]) callargsets.reverse() for callargset in callargsets[1:]: self.call_uniform_add(*callargset) if __name__=="__main__": import time import pycuda.autoinit import numpy as np sz = 512*512*512/8 a = gpuarray.empty(sz, dtype=np.int32) b = a a.fill(np.int32(1)) krnl = ExclusivePrefixKernel('int', 'a+b', '0') time_start = time.time() krnl(a, b, sz) print "kernel finished in %f seconds "%(time.time() - time_start) print "a = ",a.get() print "b = ",b.get() On 21 February 2011 23:27, Andreas Kloeckner <li...@informa.tiker.net> wrote: > Hi Nithin, > > On Mon, 21 Feb 2011 22:27:04 +0530, nithin s <nithin19...@gmail.com> wrote: >> I believe there are some errors in the implementation. Im >> basing my comments only on the exclusive version. >> >> The final call to finish adds the "each" of the partial sums to >> every element of the result. That is to say that if my array size was >> 1024x1024 and each thread block worked on 1024 elements. My partial >> sum array would be as large as 1024 and the last(or second to last) >> block would have to iterate 1024 sums to produce the result. >> >> Isn't this wrong? shouldn't the partial sums be prefix scanned >> and then each block adds the associated partial sum o/p to each of its >> elements. That way the loop for (int i = 1; i <= blockIdx.x; i++) is >> not needed. > > We know it's broken at the moment--that's why it's currently living on a > branch and not in mainline PyCUDA yet. Patches welcome. > > Andreas > > _______________________________________________ PyCUDA mailing list PyCUDA@tiker.net http://lists.tiker.net/listinfo/pycuda