Hi,

     Am attaching an updated version.

@Andreas: the floating pt ops are not part of the code anymore.

     Some code beautification and consistent nomenclature.

Regards

Nithin

On 22 February 2011 15:17, nithin s <nithin19...@gmail.com> wrote:
> Hi Andreas
>
> On 22 February 2011 05:45, Andreas Kloeckner <li...@informa.tiker.net> wrote:
>> Hi Nithin,
>>
>> two requests:
>>
>> - can you please resend this as an attachment? It's hard to fish out of
>>  the text of an email.
> Done
>>
>> - please avoid using floating point functions (log, ceil, floor) in
>>  integer contexts. PyCUDA comes with a bitlog2 function that does what
>>  you need, I think.
>
> bitlog2 alone doesn't cut it. This is becase the log is taken to the
> base 2*block_size. block_size need not be a power of 2 in a few rare
> cases. This is because if shared mem is limited then the block_size =
> shared_mem/item_size. Now Item size need not be a power of 2 (If we
> are willing to support arbitrary types.. though there is a limitation
> .. since dtype needs to be known for partial sum array
> allocations..which is presumably numpy.dtype).
>
> This will mess up the estimate. I could recode this by writing a
> routine by repeatedly dividing and calculating the necessary int ciel.
> I feel the current expression is cleaner and concise. Let me know if
> you still feel otherwise.
>
>>
>> Once I get the file posted on the PyCUDA branch, I'll write a more
>> complete review. I agree with your assessment of inclusive vs exclusive
>> scan. I'd say go ahead and kill the inclusive version.
>>
>> Tomasz, what's your take here?
>>
>> Andreas
>>
>>
>
> Regards
>
> Nithin
>
>
> @Bryan: Tomaszs' original inclusive scan impl was based on the naive
> prefix scan algorithm at http://en.wikipedia.org/wiki/Prefix_sum. This
> is not particularly work efficient. I don't (yet) see a neat way to
> convert the Exclusive Mark Harris version to an inclusive one.Thus I
> thought it better to maintain a single exclusive prefix scan.
>
from pycuda import driver, compiler, gpuarray, tools, reduction
from warnings import warn

exclusive_scan_source = """
            #define SUM(a, b) (%(scan_operation)s)

            %(preamble)s

            __global__ void %(name)s(const %(data_type)s *i_data,%(data_type)s *o_data
%(if_part_sum)s                     ,%(data_type)s *partial_sum
                                    ,const int n
%(if_tail)s                         ,const int block_index
                                    )
            {
                extern __shared__ %(data_type)s shared_array[];

                int source_node_index = threadIdx.x;
                
                // Size of shared array that can contain input data
%(if_main)s     const int full_array_size = n;
%(if_tail)s     const int full_array_size = 1<<((int)(floor(log2((float)n))+1));

%(if_main)s     const int block_index  = blockIdx.x;
                const int block_offset = 2*block_index*blockDim.x;
                int tree_node_distance = 1;

                int target_node_index = threadIdx.x + (n/2);
                
%(if_tail)s     if (source_node_index < n) 
                {                
                    shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] =
                    i_data[source_node_index + block_offset];
                }
%(if_tail)s     else {shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;}

%(if_tail)s     if (target_node_index < n) 
                {
                    shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] =
                    i_data[target_node_index + block_offset];
                }
%(if_tail)s     else {shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;}



                // Travel upwards, from leaves to the root of the tree
                // During each loop time distance between nodes increases two times,
                // and number of nodes decreases two times
                for (int number_of_nodes = full_array_size>>1; number_of_nodes > 0;
                    number_of_nodes >>= 1) 
                {
                    __syncthreads();

                    if (threadIdx.x < number_of_nodes) 
                    {
                        int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1;
                        int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1;
                        source_node_index += SHARED_BANK_PADDING(source_node_index);
                        target_node_index += SHARED_BANK_PADDING(target_node_index);

                        shared_array[target_node_index] =
                            SUM(shared_array[target_node_index],
                            shared_array[source_node_index]);
                    }
                    tree_node_distance <<= 1;
                }

                if (threadIdx.x == 0) 
                {
%(if_part_sum)s     partial_sum[block_index] = shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)];
                    shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s;
                }

                // Travel downwards, from root to the leaves
                // During each loop number of nodes increases two times and
                // distance between nodes decreases two times
                for (int number_of_nodes = 1; number_of_nodes < full_array_size; number_of_nodes <<= 1)
                {
                    tree_node_distance >>= 1;
                    __syncthreads();

                    if (threadIdx.x < number_of_nodes) 
                    {
                        int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1;
                        int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1;
                        source_node_index += SHARED_BANK_PADDING(source_node_index);
                        target_node_index += SHARED_BANK_PADDING(target_node_index);

                        %(data_type)s temp = shared_array[source_node_index];
                        shared_array[source_node_index] = shared_array[target_node_index];
                        shared_array[target_node_index] =
                            SUM(shared_array[target_node_index], temp);
                    }
                }
                __syncthreads();
                
%(if_tail)s     if (source_node_index < n)
                {                
                    o_data[source_node_index + block_offset] =
                        shared_array[source_node_index +
                        SHARED_BANK_PADDING(source_node_index)];
                }
                
%(if_tail)s     if (target_node_index < n) 
                {
                    o_data[target_node_index + block_offset] =
                        shared_array[target_node_index +
                        SHARED_BANK_PADDING(target_node_index)];
                }
            }

"""


uniform_sum_source = """
            #define SUM(a, b) (%(scan_operation)s)

            %(preamble)s

            __global__ void %(name)s(%(data_type)s *o_data,%(data_type)s *partial_sum
                                    ,const int n
%(if_tail)s                         ,const int block_index
                                    )
            {
                extern __shared__ %(data_type)s shared_array[];

                int source_node_index = threadIdx.x;
                
                // Size of shared array that can contain input data

%(if_main)s     const int block_index  = blockIdx.x+1;
                const int block_offset = 2*block_index*blockDim.x;

                int target_node_index = threadIdx.x + (n/2);
                
                if (threadIdx.x == 0) 
                {
                    shared_array[0] = partial_sum[block_index];
                }
                
                __syncthreads();
                
                // is this a bad call??
                %(data_type)s prev_block_sum = shared_array[0];
                
%(if_tail)s     if (source_node_index < n)
                {                    
                    o_data[source_node_index + block_offset] = SUM(prev_block_sum,o_data[source_node_index + block_offset]);
                }
                
%(if_tail)s     if (target_node_index < n) 
                {
                    o_data[target_node_index + block_offset] = SUM(prev_block_sum,o_data[target_node_index + block_offset]);
                }
            }

"""


def _div_ceil(nr, dr):
  return int(int(nr + dr -1)/int(dr))
  
def get_num_levels(n,c):
  l = 0
  while(n != 1):    
    l += 1
    n = _div_ceil(n,c)
  return l

def get_num_chunks(n,c,i):
  return _div_ceil(n,pow(c,i))

# TODO: check this four .. is it future proof??
padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n"
#padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n"
    
class ExclusivePrefixKernel(object):
    def __init__(self, data_type, scan_operation, neutral,
        name = 'prefix_kernel', keep = False, options = [], preamble = ''):
          
        # Determine the size of used data type
        self.numpy_type = tools.parse_c_arg("const %s * in" % 
            data_type).dtype
        item_size = self.numpy_type.itemsize

        # Determine the number of threads
        dev = driver.Context.get_device()
        block_size = dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK)
        block_dimension =  dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X)
        self.block_size = min(block_size, block_dimension)

        # Shared memory size: two items per thread, number of threads,
        # size of one data item
        self.shared_size = self.get_chunk_size()*item_size
        # Padding to avoid bank conflicts
        # TODO: is this always 4??
        log_num_banks = 4
        self.shared_size += ((self.get_chunk_size() >> log_num_banks) * 
            item_size)

        # Check whether we do not exceed available shared memory size
        max_shared_size = dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK)
        if self.shared_size > max_shared_size:            
            warn ("Changed shared size")
            self.shared_size = max_shared_size
            self.block_size = self.shared_size//(2*item_size)

        self.main_function = self.get_main_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.main_function.prepare("PPi", block=(self.block_size, 1, 1),
            shared=self.shared_size)
            
        self.tail_function = self.get_tail_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.tail_function.prepare("PPii", block=(self.block_size, 1, 1),
            shared=self.shared_size)
            
        self.main_part_sum_function = self.get_main_part_sum_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.main_part_sum_function.prepare("PPPi", block=(self.block_size, 1, 1),
            shared=self.shared_size)
            
        self.tail_part_sum_function = self.get_tail_part_sum_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.tail_part_sum_function.prepare("PPPii", block=(self.block_size, 1, 1),
            shared=self.shared_size)
            
        self.main_uniform_sum_function = self.get_main_uniform_sum_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.main_uniform_sum_function.prepare("PPi", block=(self.block_size, 1, 1),
            shared=item_size)
        
        self.tail_uniform_sum_function = self.get_tail_uniform_sum_function(data_type, scan_operation,
            neutral, name, keep, options, preamble)
        self.tail_uniform_sum_function.prepare("PPii", block=(self.block_size, 1, 1),
            shared=item_size)            
            
        # Use maximum available shared memory in 2.x devices
        # TODO: is it needed as we are more limited by number of threads?
        # Might be needed for large data types (?)
        if dev.compute_capability() >= (2, 0):
            cache_size = pycuda.driver.func_cache.PREFER_SHARED
            self.main_function.set_cache_config(cache_size)
            self.tail_function.set_cache_config(cache_size)
            self.main_part_sum_function.set_cache_config(cache_size)
            self.tail_part_sum_function.set_cache_config(cache_size)            
        
    def get_chunk_size(self):
        return 2 * self.block_size
        
    def get_main_function(self, data_type, scan_operation, neutral,
        name = 'prefix_kernel', keep = False, options = [], preamble = ''):
        
        src =    exclusive_scan_source % {
          'data_type': data_type,
          'name': name,
          'neutral': neutral,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "//",
          'if_main': "",
          'if_part_sum': "//",
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)
    
    def get_tail_function(self, data_type, scan_operation, neutral,
        name = 'prefix_kernel', keep = False, options = [], preamble = ''):
        
        src =    exclusive_scan_source % {
          'data_type': data_type,
          'name': name,
          'neutral': neutral,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "",
          'if_main': "//",
          'if_part_sum': "//",
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)
    
    def get_main_part_sum_function(self, data_type, scan_operation, neutral,
        name = 'prefix_kernel', keep = False, options = [], preamble = ''):
        
        src =    exclusive_scan_source % {
          'data_type': data_type,
          'name': name,
          'neutral': neutral,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "//",
          'if_main': "",
          'if_part_sum': "",
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)

    def get_tail_part_sum_function(self, data_type, scan_operation, neutral,
        name = 'prefix_kernel', keep = False, options = [], preamble = ''):
        
        src =    exclusive_scan_source % {
          'data_type': data_type,
          'name': name,
          'neutral': neutral,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "",
          'if_main': "//",
          'if_part_sum': "",
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)
        
    def get_main_uniform_sum_function(self, data_type, scan_operation, neutral,
        name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
        
        src =    uniform_sum_source % {
          'data_type': data_type,
          'name': name,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "//",
          'if_main': "",        
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)
        
    def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral,
        name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
        
        src =    uniform_sum_source % {
          'data_type': data_type,
          'name': name,
          'scan_operation': scan_operation,
          'preamble': preamble+padding_preamble,
          'if_tail': "",
          'if_main': "//",        
          }
          
        return compiler.SourceModule(src, options=options, keep=keep).get_function(name)
    
        
    def call_final(self,input_size,i_data,o_data):
      
        block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size()))        
        
        if input_size != block_count * self.get_chunk_size():
            if block_count > 1:
                self.main_function.prepared_call((block_count-1, 1),
                    i_data.gpudata, o_data.gpudata, self.get_chunk_size())
            self.tail_function.prepared_call((1, 1), i_data.gpudata,
                o_data.gpudata,
                input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
        else:
            self.main_function.prepared_call((block_count, 1),
                i_data.gpudata, o_data.gpudata,
                self.get_chunk_size())
                
    def call_intermediate(self,input_size,i_data,o_data,part_sum_buf):
      
        block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size()))        
        
        if input_size != block_count * self.get_chunk_size():
            if block_count > 1:
                self.main_part_sum_function.prepared_call((block_count-1, 1),
                    i_data.gpudata, o_data.gpudata,part_sum_buf.gpudata, self.get_chunk_size())
            self.tail_part_sum_function.prepared_call((1, 1), i_data.gpudata,
                o_data.gpudata,part_sum_buf.gpudata,
                input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
        else:
            self.main_part_sum_function.prepared_call((block_count, 1),
                i_data.gpudata, o_data.gpudata,part_sum_buf.gpudata,
                self.get_chunk_size())
        
    def call_uniform_add(self,input_size,i_data,o_data,part_sum_buf):       
      
        block_count = ((input_size + self.get_chunk_size() - 1)/(self.get_chunk_size()))
        
        if block_count <= 1:
            raise Exception("This should not have happened .. nothing to be uniform added")
        
        if input_size != block_count * self.get_chunk_size():
            block_count -= 1
            if block_count > 1:
                self.main_uniform_sum_function.prepared_call((block_count-1, 1),
                    o_data.gpudata, part_sum_buf.gpudata, self.get_chunk_size())
            block_count += 1
            self.tail_uniform_sum_function.prepared_call((1, 1), o_data.gpudata,part_sum_buf.gpudata,
                input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
        else:
            block_count -= 1
            self.main_uniform_sum_function.prepared_call((block_count, 1),
                o_data.gpudata, part_sum_buf.gpudata,
                self.get_chunk_size())

        

    def __call__(self, *args, **kwargs):       
        
        i_data = kwargs.get('i_data')
        if i_data is None:
            i_data = args[0]
            
        o_data = kwargs.get('o_data')
        if o_data is None:
            o_data = args[1]
        
        n      = kwargs.get('n')
        if n is None:
            n = min(i_data.size,o_data.size)
                
        part_sum_bufs  = [ gpuarray.empty(get_num_chunks(n,self.get_chunk_size(),l),o_data.dtype) 
                            for l in range(1,get_num_levels(n,self.get_chunk_size())) ]
                            
        callargsets    = [ [n,i_data,o_data] ] + [ [ps_buf.size,ps_buf,ps_buf] for ps_buf in  part_sum_bufs]
        
        for i,ps_buf in enumerate(part_sum_bufs):
          callargsets[i] += [ps_buf]
        
        for callargset in callargsets[0:-1]:
          self.call_intermediate(*callargset)
        
        self.call_final(*callargsets[-1])
        
        for callargset in reversed(callargsets[0:-1]):
          self.call_uniform_add(*callargset)
          
        
if __name__=="__main__":    
    
    import time
    import pycuda.autoinit
    import numpy as np    
  
    sz = 1024*1024*3 - 2
    a = gpuarray.empty(sz, dtype=np.int32)
    b = a

    a.fill(np.int32(1))

    krnl = ExclusivePrefixKernel('int', 'a+b', '0')

    time_start = time.time()

    krnl(a, b, sz)

    print "kernel finished in %f seconds "%(time.time() - time_start)

    print "a = ",a.get()
    print "b = ",b.get()                    
                      
_______________________________________________
PyCUDA mailing list
PyCUDA@tiker.net
http://lists.tiker.net/listinfo/pycuda

Reply via email to