I write a fiber scheduler use threads,segfault accur on the code,what's the
matter? thanks!

private import core.thread;
private import std.stdio;

private class SyncQueue(T)
{

    private T[] queue;

    public synchronized void push(T o)
    {
        queue ~= o;
    }

    public synchronized T pop()
    {
        T o = null;
        if (queue.length > 1)
        {
            o = queue[0];
            queue = queue[1..$];
        }
        else if (queue.length == 1)
        {
            o = queue[0];
            queue = [];
        }
        return o;
    }

    public bool empty()
    {
        return queue.length ==0;
    }

}

Fiber spawn(Args...)(void function(Args) fn, Args args)
{
    Fiber fiber = Scheduler.instance.createFiber(fn,args);
    Scheduler.instance.schedule(fiber);
    return fiber;
}

Fiber spawn(Args...)(void delegate(Args) dg, Args args)
{
    Fiber fiber = Scheduler.instance.createFiber(dg,args);
    Scheduler.instance.schedule(fiber);
    return fiber;
}

private class Scheduler
{
    private SyncQueue!(Fiber) runQueue;
    private SyncQueue!(Fiber) reuseQueue;

    private static Scheduler _scheduler;

    private static this()
    {
        _scheduler = new Scheduler;
    }

    private this ()
    {
        runQueue   = new SyncQueue!(Fiber);
        reuseQueue = new SyncQueue!(Fiber);
    }

    public void schedule(Fiber fiber)
    {
        runQueue.push(fiber);
    }

    public void run(int threadNum =1)
    {
        ThreadGroup tg = new ThreadGroup;
        for (int i =0;i<threadNum;i++)
        {
            tg.create(
            {
                while (!runQueue.empty)
                {
                    Fiber fiber = runQueue.pop();
                    if (fiber is null)
                    {
                        continue;
                    }
                    else
                    {
                        fiber.call();
                        if (fiber.state == Fiber.State.TERM)
                        {
                            fiber.reset();
                            reuseQueue.push(fiber);//segmentation fault accur 
here
                        }
                        else
                        {
                            runQueue.push(fiber);//segmentation fault accur here
                        }
                    }
                }
            });
        }
        tg.joinAll;
    }

    public Fiber createFiber(Args...)(void function(Args) fn, Args args)
    {
        void proc()
        {
            fn(args);
        }
        Fiber fiber = reuseQueue.pop();
        if (!fiber)
            fiber = new Fiber(&proc);
        return fiber;
    }

    public Fiber createFiber(Args...)(void delegate(Args) dg, Args args)
    {
        void proc()
        {
            dg(args);
        }
        Fiber fiber = reuseQueue.pop();
        if (!fiber)
            fiber = new Fiber(&proc);
        return fiber;
    }

    public static Scheduler instance()
    {
        return _scheduler;
    }
}

void main()
{
    spawn(
    {
        writefln("%p",cast(void *) Fiber.getThis);
        spawn(
        {
           writefln("%p",cast(void *)Fiber.getThis);
        });
    });

    Scheduler.instance.run(4);
}

Reply via email to