On 5/27/22 06:55, Christian Köstlin wrote:

> I wonder how I can synchronize the "dumping" and the
> collection of the threads. Would be cool to have an efficient lockless
> implementation of appender ...

That turned out to be nontrivial.

The following is a draft I played with. Collector collects and Dumper dumps. They use a SpinLock, an unpublished feature of core.internal for locking. The implementation of spinlock (e.g. at /usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to "test and test-and-set (TTAS)":

  https://en.wikipedia.org/wiki/Test_and_test-and-set

I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)

The code is attached and works on my system.

Ali

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
  @disable this(this);
  @disable void opAssign(ref const(typeof(this)));

  SpinLock * lock;

  this(shared(SpinLock) * lock) {
    this.lock = lock;
    lock.lock();
  }

  ~this() {
    lock.unlock();
  }
}

struct Collector {
  long[] data;

  shared(SpinLock) lock;

  auto scopeLock() shared {
    return ScopeLock(&lock);
  }

  // Adds a data point to this collector.
  void add(long i) shared {
    auto sl = scopeLock();

    /// Some crazy way of adding data points. Real code should
    // make more sense.
    data ~= i;
  }

  // Adds the data of this collector to the specified array
  // array. Again, real code should use a more sophisticated
  // method.
  void aggregate(ref long[] where) shared {
    auto sl = scopeLock();

    where ~= data.sum;
    data.length = 0;
    (cast(long[])data).assumeSafeAppend();
  }
}

// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;

synchronized class Dumper {
private:
  shared(Collector)*[] collectors;

  void register(shared(Collector) * collector) shared {
    writeln("registering ", collector);
    collectors ~= collector;
  }

  // Dumps current results.
  void dump(File output) shared {
    long[] data;

    foreach (collector; collectors) {
      collector.aggregate(data);
    }

    const allData = data.sum;

    if (allData != 0) {
      stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
      allThatHasBeenDumped += allData;
    }
  }
}

shared(Dumper) dumper;

shared static this() {
  writeln("Making a Dumper");
  dumper = new Dumper();
}

shared(Collector) * collector;

static this() {
  writeln("Making a Collector");
  collector = new shared(Collector)();
  dumper.register(cast(shared)collector);
}

// Main thread function
void doWork() {
  try {
    doWorkImpl();

  } catch (Throwable exc) {
    stderr.writeln("Caught Throwable: ", exc.msg);
  }
}

// The implementation of each thread.
void doWorkImpl() {
  auto sw = StopWatch();
  sw.start();

  long i = 0;
  while (sw.peek < threadRunTime) {
    (cast(shared)collector).add(i);
    ++i;
  }

  --i;
  auto total = i * (i + 1) / 2;
  writefln("Thread collected %,s items equaling %,s with %s",
           i, total, collector);

  atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
  writeln("main started");
  iota(workerCount).each!(_ => spawn(&doWork));

  auto sw = StopWatch();
  sw.start();

  while (sw.peek < mainRunTime) {
    dumper.dump(stdout);
    Thread.sleep(100.msecs);
  }

  // One final collection (and dump):
  dumper.dump(stdout);

  assert(allThatHasBeenDumped == allCollectedByThreads);
}

Reply via email to