I'm trying to query a single model from multiple threads. The use case is 
that a single query on the model doesn't utilize the full power of the GPU, 
so the throughput could be improved by starting multiple queries in 
parallel instead of serializing the requests. I don't want to load multiple 
instances of the model onto the GPU due to memory constraints. Since I 
think that theano is not fully thread-safe, the idea is to compile multiple 
instances of the computational graph (theano.function) and have the workers 
of the threadpool work on a single graph at a time. The shared variables of 
the model are only loaded once. See the example code below (the model there 
is just a dummy model for demonstrating the problem).

This works if the model is on CPU, but fails if it is on the GPU. I get the 
following error: GpuArrayException: b'cuEventCreate: 
CUDA_ERROR_INVALID_CONTEXT: invalid device context' Is there a way to 
initialize the context for each individual thread or clone the context of 
the parent thread? Thanks in advance!

Example code:
------------------------------------------------------------------------------------------
import theano
from theano import tensor as T
import numpy as np
from collections import OrderedDict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

#this huge matrix will be shared between computational graphs
W1 = theano.shared(np.random.rand(100,1000000).astype('float32'))

#function for defining the computational graphs
def create_graph():
    H = theano.shared(np.zeros(100, dtype='float32'))
    updatesH = OrderedDict()
    updatesH[H] = T.zeros_like(H)
    reset = theano.function([], updates=updatesH)
    X = T.fvector()
    updatesH[H] = H + X
    upd = theano.function([X], updates=updatesH)
    Y = T.dot(H, W1).mean()
    calc = theano.function([], Y)
    return reset, upd, calc

#queue of computational graphs; the workers will remove one computational 
graph at a time, use it for computations and then put it back
class GraphPoolHandler:
    def __init__(self, n_threads):
        self.graph_queue = Queue()
        for i in range(n_threads):
            self.graph_queue.put(create_graph()) #fill the queue with 
computational graphs
    def run(self, n, x):
        reset, upd, calc = self.graph_queue.get()
        reset()
        for i in range(n):
            upd(x)
        res = calc()
        self.graph_queue.put((reset, upd, calc))
        return res

graph_pool = GraphPoolHandler(4)
thread_pool = ThreadPoolExecutor(max_workers=4)

def print_callback(v):
    print(v.result(), end='\n')

f1 = thread_pool.submit(graph_pool.run, 2, np.arange(100).astype('float32'))
f2 = thread_pool.submit(graph_pool.run, 1, np.arange(100).astype('float32'))
f3 = thread_pool.submit(graph_pool.run, 4, np.arange(100).astype('float32'))
f4 = thread_pool.submit(graph_pool.run, 3, np.arange(100).astype('float32'))
f5 = thread_pool.submit(graph_pool.run, 5, np.arange(100).astype('float32'))
f1.add_done_callback(print_callback)
f2.add_done_callback(print_callback)
f3.add_done_callback(print_callback)
f4.add_done_callback(print_callback)
f5.add_done_callback(print_callback)
------------------------------------------------------------------------------------------

-- 

--- 
You received this message because you are subscribed to the Google Groups 
"theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to theano-users+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to