Fwiw Dmitry <https://plus.google.com/113761107534428240799>, I have been 
working with fractals a lot lately, and was wondering if I could still 
program a proxy collector from scratch. Remember my old collector here:

http://webpages.charter.net/appcore/misc/pc_sample_h_v1.html

?

You are using it as an example in Relacy.

Wrt the following code, all of the membars are standalone fence operations 
std::atomic_thread_fence. 
All of the membars in the atomic operations are relaxed. It uses DWCAS for 
the acquire function because I wanted to start out simple. I have also 
replaced a CAS with XCHG in the collect, or mutate function if you will.  So 
far, it works in Relacy. :^)

Here is a link to my code:

https://pastebin.com/raw/RKTLyXWt

_______________________

/* Simple Proxy Collector (DWCAS) - Relacy Version

http://www.1024cores.net/home/relacy-race-detector

Copyright 3/27/2018
___________________________________________________*/


#include <relacy/relacy_std.hpp>
#include <cstdio>
#include <climits>


/* Membar Abstraction
___________________________________________________*/
#define ct_mb_relaxed std::memory_order_relaxed
#define ct_mb_acquire std::memory_order_acquire
#define ct_mb_release std::memory_order_release
#define ct_mb_seq_cst std::memory_order_seq_cst
#define ct_mb_fence(mb_membar) std::atomic_thread_fence(mb_membar) 


/* Simple Proxy Collector C++11
___________________________________________________*/
namespace ct { 
namespace proxy {

    // User object base
    struct object
    {
        virtual ~object() {}
    };


    // Proxy node
    struct node
    {
        std::atomic<std::intptr_t> count;
        VAR_T(node*) next;
        VAR_T(object*) obj;

        node(std::intptr_t count_, node* next_, object* obj_)
            : count(count_), next(next_), obj(obj_)
        {

        }

        ~node()
        {

        }
    };


    // DWCAS target
    struct anchor
    {
        std::intptr_t count;
        node* head;

        anchor()
        {

        }

        anchor(std::intptr_t count_, node* head_)
        {
            count = count_;
            head = head_;
        }

        bool operator == (anchor const& right) const
        {
            return count == right.count && head == right.head;
        }

        anchor operator + (anchor const& right) const
        {
            anchor res;
            res.count = count + right.count;
            res.head = right.head;
            return res;
        }

        anchor operator - (anchor const& right) const
        {
            anchor res;
            res.count = count - right.count;
            res.head = right.head;
            return res;
        }
    };

    std::ostream& operator << (std::ostream& s, anchor const& right)
    {
        return s << "{" << right.count << "," << right.head << "}";
    }


    // Collector Logic
    struct gc
    {
        std::atomic<anchor> head;

        gc() : head(anchor{ 0, new node(0, nullptr, nullptr) }) {}

        ~gc()
        {
            anchor cmp = head.load(ct_mb_relaxed);
            RL_ASSERT(cmp.count > -1);
            RL_ASSERT(VAR(cmp.head->next) == nullptr);
            prv_dtor(cmp.head);
        }

        // Drops a reference count on a node
        bool prv_release(node* n)
        {
            std::intptr_t count = n->count.fetch_sub(2, ct_mb_relaxed);
            if (count == 3) return true;
            return false;
        }

        // Deletes a node
        void prv_dtor(node* n)
        {
            object* obj = VAR(n->obj);
            if (obj != nullptr) delete obj;
            delete n;
        }

        // Dumps backlinked nodes
        void prv_dump(node* n)
        {
            node* cur = VAR(n->next);
            VAR(n->next) = nullptr;

            // Release and collect in cur LIFO
            while (prv_release(cur))
            {
                ct_mb_fence(ct_mb_acquire);
                node* next = VAR(cur->next);
                VAR(cur->next) = n;
                n = cur;
                cur = next;
            }

            // Destroy cur LIFO
            while (n)
            {
                node* next = VAR(n->next);
                prv_dtor(n);
                n = next;
            }
        }

        // Collects a node
        void prv_collect(node* n)
        {
            anchor xchg = { 0, n };

            ct_mb_fence(ct_mb_release);

            // Swap in new node
            anchor cmp = head.exchange(xchg, ct_mb_relaxed);

            // Backlink
            ct_mb_fence(ct_mb_acquire);
            VAR(cmp.head->next) = xchg.head;
            ct_mb_fence(ct_mb_release);

            // Release and mark as collected
            std::intptr_t count = cmp.head->count.fetch_add(cmp.count + 1, 
ct_mb_relaxed);

            if (count + cmp.count == 0)
            {
                prv_dump(cmp.head);
            }
        }

        
        // Acquires current node
        node* acquire()
        {
            anchor cmp = head.load(ct_mb_relaxed);
            anchor xchg = { cmp.count + 2, cmp.head };

            while (! head.compare_exchange_weak(cmp, xchg, ct_mb_relaxed)) 
            {
                xchg = { cmp.count + 2, cmp.head };
            }

            ct_mb_fence(ct_mb_acquire);

            return cmp.head;
        }
        

        // Release a node
        void release(node* n)
        {
            ct_mb_fence(ct_mb_release);

            if (prv_release(n))
            {
                ct_mb_fence(ct_mb_acquire);
                prv_dump(n);
            }
        }

        // Collects an object
        void collect(object* obj)
        {
            prv_collect(new node(2, nullptr, obj));
        }
    };
}}


// Test object
struct foo : public ct::proxy::object
{
    std::atomic<foo*> next;

    foo(foo* next_ = nullptr) : next(next_) { }

    virtual ~foo()
    {
        //std::cout << "foo::~foo()\n";
    }
};

// An atomic LIFO of test objects
struct foo_lifo
{
    std::atomic<foo*> head;

    foo_lifo(foo* next_ = nullptr) : head(next_) { }

    void push(foo* n)
    {
        foo* cmp = head.load(ct_mb_relaxed);

        do {
            n->next.store(cmp, ct_mb_relaxed);
            ct_mb_fence(ct_mb_release);
        }  while (! head.compare_exchange_weak(cmp, n, ct_mb_relaxed));
    }

    foo* flush()
    {
        foo* cmp = head.exchange(nullptr, ct_mb_relaxed);
        if (cmp) ct_mb_fence(ct_mb_acquire);
        return cmp;
    }
};


// Program Logic
#define READERS 5
#define WRITERS 3
#define ITERS 5
#define THREADS (READERS + WRITERS)

struct ct_proxy_test
    : rl::test_suite<ct_proxy_test, THREADS>
{
    ct::proxy::gc g_gc;
    foo_lifo g_lifo;


    void before()
    {

        //std::printf("before()\n");
    }


    void after()
    {
        //std::printf("after()\n");
    }


    // Reader threads...
    void thread_reader(unsigned int tidx)
    {
        //std::printf("thread_reader::%u\n", tidx);

        ct::proxy::node* pcn = g_gc.acquire();

        {
            foo* cur = g_lifo.head.load(ct_mb_relaxed);
            ct_mb_fence(ct_mb_acquire);

            while (cur)
            {
                foo* next = cur->next.load(ct_mb_relaxed);
                rl::yield(1, $);
                cur = next;
            }
        }

        g_gc.release(pcn);
    }


    // Writer threads...
    void thread_writer(unsigned int tidx)
    {
        //std::printf("thread_writer::%u\n", tidx);

        for (unsigned int i = 0; i < ITERS; ++i)
        {
            // Add some nodes, and peform some collections...
            g_lifo.push(new foo());
            g_lifo.push(new foo());

            if (! (i % 2)) g_gc.collect(nullptr);

            rl::yield(1, $);

            g_lifo.push(new foo());

            if (! (i % 3)) g_lifo.push(new foo());

            // Flush
            foo* cur = g_lifo.flush();

            // Collect the foo's... ;^)
            while (cur)
            {
                foo* next = cur->next.load(ct_mb_relaxed);
                rl::yield(1, $);
                g_gc.collect(cur);
                cur = next;
            }
        }
    }
    

    // Entry...
    void thread(unsigned int tidx)
    {
        if (tidx < READERS)
        {
            thread_reader(tidx);
        }

        else
        {
            thread_writer(tidx);
        }
    }
};



int main()
{
    {
        rl::test_params p;

        p.iteration_count = 12345600; // for existing proxy gc code
        //p.execution_depth_limit = 3333;
        //p.search_type = rl::random_scheduler_type;

        //p.search_type = rl::sched_bound;
        //p.search_type = rl::fair_full_search_scheduler_type;
        //p.search_type = rl::fair_context_bound_scheduler_type;

        rl::simulate<ct_proxy_test>(p);
    }

    std::puts("\nTest Complete!\n");
    std::fflush(stdout);
    std::getchar();

    return 0;
}

_______________________



Thanks.

-- 

--- 
You received this message because you are subscribed to the Google Groups 
"Scalable Synchronization Algorithms" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to lock-free+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/lock-free/8a6adfaa-61e3-4d71-9a81-0b6f9becca9b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to