Well, here is the pseudo-code sketch:
______________________________________________________________________
#define THREAD_COUNT 4
#define LOCAL_WORK_THRESHOLD 10
struct work {
struct work* next;
void process();
};
struct work_request {
struct thread* thread;
};
struct thread {
eventcount m_ecount;
int request_index; // init with (rand() % THREAD_COUNT)
mpsc_stack<work_request*> m_work_request;
mpsc_stack<work*> m_remote_work;
non_atomic_queue<work*> m_local_work;
};
static mpmc_queue<work> g_global_work;
static thread[THREAD_COUNT] g_threads;
bool try_to_give(thread* this, work* w) {
if (! this->m_work_request.empty()) {
work_request* wreq = this->m_work_request.trypop();
if (wreq) {
wreq->thread->m_remote_work.push(w);
wreq->thread->m_ecount.signal();
delete wreq;
return true;
}
}
return false;
}
work* try_to_work(thread* this) {
redo:
work* whead = this->m_local_work.trypop();
if (whead) {
if (try_to_give(this, whead)) { goto redo; }
whead->next = NULL;
}
work* w = this->m_remote_work.trypop();
if (w) {
w->next = whead;
whead = w;
}
w = g_global_work.trypop();
if (w) {
w->next = whead;
whead = w;
}
return whead;
}
void request_work(thread* this) {
redo:
int request_index = this->request_index
for (int i = request_index; i < THREAD_COUNT; ++i) {
if (! g_threads[i].m_local_queue.empty()) {
this->request_index = i;
work_request* wreq = new work_request;
wreq->thread = this;
g_threads[i].m_work_request.push(wreq);
g_threads[i].m_ecount.signal();
return;
}
}
if (request_index != 0) {
this->request_index = 0;
goto redo;
}
}
work* wait_for_work(thread* this) {
work* w;
while (! (w = try_to_work(this)) {
eventcount::key wkey = this->m_ecount.get();
if ((w = try_to_work(this)) { break; }
request_work(this);
this->m_ecount.wait(wkey);
}
return w;
}
void thread_entry() {
thread this_thread;
work* w;
for (;;) {
w = wait_for_work(&this_thread);
while (w) {
work* next w->next;
w->process();
// produce_local_work(w) or
// produce_global_work(w) or
// delete w;
w = next;
}
}
}
void produce_global_work(work* w) {
g_global_work.push(w);
for (int i = 0; i < THREAD_COUNT; ++i) {
if (g_threads[i].m_local_queue.empty()) {
g_threads[i].m_ecount.signal();
return;
}
}
g_threads[rand() % THREAD_COUNT].m_ecount.signal();
}
void produce_local_work(thread* this, work* w) {
if (this->m_local_work.count() > LOCAL_WORK_THRESHOLD) {
if (try_to_give(this, w)) { return; }
}
this->m_local_work.push(w);
}
void get_things_going() {
for (int i = 0; i < rand() % 10000; ++i) {
produce_global_work(new work(...));
}
}
______________________________________________________________________
This is not work-stealing. Instead of a thread with nothing to do trying
to
steal work from another, it can enqueue a request onto another threads
work-request stack and signals its eventcount. If the other thread has
work
to spare, it gives to threads requesting work from it and signals
requestors
eventcount. Global work is enqueues on a global queue and a threads
eventcount is signaled. A thread can queue work locally, globally, or give
it to requestor.
There are a TON of major improvements to be made to this highly crude and
naive algorithm for sure. But, it kind of seems like it can be made to
"work". Humm, I don't know. I need feedback.
Try not to flame me too hard okay?
;^O
Do you think its work creating an implementation of this thing? It should
not be that hard at all. Humm...


|