Robert, * Robert Haas (robertmh...@gmail.com) wrote: > Attached is a contrib module that lets you launch arbitrary command in > a background worker, and supporting infrastructure patches for core.
Very cool! Started looking into this while waiting on a few CLOBBER_CACHE_ALWAYS runs to finish (ugh...). Perhaps I'm just being a bit over the top, but all this per-character work feels a bit ridiculous.. When we're using MAXIMUM_ALIGNOF, I suppose it's not so bad, but is there no hope to increase that and make this whole process more efficient? Just a thought. After reading through the code for 0001, I decided to actually take it out for a spin- see attached. I then passed a few megabytes of data through it and it seemed to work just fine. In general, I'm quite excited about this capability and will be looking over the later patches also. I also prefer the function-pointer based approach which was taken up in later versions to the hook-based approach in the initial patches, so glad to see things going in that direction. Lastly, I will say that I feel it'd be good to support bi-directional communication as I think it'll be needed eventually, but I'm not sure that has to happen now. Thanks! Stephen
#include <stdio.h> #include <limits.h> #include <stdint.h> #include <stdbool.h> #include <stddef.h> #include <stdlib.h> #include <time.h> typedef uint64_t uint64; typedef uint8_t uint8; #define MAXIMUM_ALIGNOF 8 #define Size size_t #define TYPEALIGN_DOWN(ALIGNVAL,LEN) \ (((uintptr_t) (LEN)) & ~((uintptr_t) ((ALIGNVAL) - 1))) #define MAXALIGN_DOWN(LEN) TYPEALIGN_DOWN(MAXIMUM_ALIGNOF, (LEN)) typedef struct shm_mq { uint64 mq_bytes_read; uint64 mq_bytes_written; Size mq_ring_size; bool mq_detached; uint8 mq_ring_offset; char mq_ring[]; } shm_mq; typedef struct shm_mq_handle { shm_mq *mqh_queue; char *mqh_buffer; Size mqh_buflen; Size mqh_consume_pending; Size mqh_partial_bytes; Size mqh_expected_bytes; bool mqh_length_word_complete; bool mqh_counterparty_attached; FILE *output; } shm_mq_handle; typedef struct shm_mq_handle shm_mq_handle; typedef struct { const char *data; Size len; } shm_mq_iovec; typedef enum { SHM_MQ_SUCCESS, /* Sent or received a message. */ SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */ SHM_MQ_DETACHED /* Other process has detached queue. */ } shm_mq_result; shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait); shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait); #define BUFSIZE 1024 #define VECSIZE 16 int main() { shm_mq_handle mqh; shm_mq_result result; int randvec; shm_mq_iovec vectors[VECSIZE]; srand(time(NULL)); mqh.output = stdout; mqh.mqh_partial_bytes = 0; randvec = (rand() % VECSIZE) + 1; fprintf(stderr, "vectors: %d\n", randvec); for (int i = 0; i < randvec; i++) vectors[i].data = NULL; while (!feof(stdin)) { for (int i = 0; i < randvec; i++) if (vectors[i].data != NULL) free((char*) vectors[i].data); for (int i = 0 ; i < randvec ; i++) { Size count; int randread = rand() % BUFSIZE; fprintf(stderr, "randread: %d\n", randread); vectors[i].data = malloc(randread); count = fread((char*) vectors[i].data, 1, randread, stdin); vectors[i].len = count; } result = shm_mq_sendv(&mqh, vectors, randvec, false); if (result != SHM_MQ_SUCCESS) return 1; } #if 0 while ((count = fread(buffer, 1, randread, stdin))) { result = shm_mq_send(&mqh, count, buffer, false); } #endif return 0; } shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) { shm_mq_iovec iov; iov.data = data; iov.len = nbytes; return shm_mq_sendv(mqh, &iov, 1, nowait); } shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) { shm_mq_result res; Size nbytes = 0; Size bytes_written; int i; int which_iov = 0; Size offset; /* Compute total size of write. */ for (i = 0; i < iovcnt; ++i) nbytes += iov[i].len; offset = mqh->mqh_partial_bytes; do { Size chunksize; /* Figure out which bytes need to be sent next. */ if (offset >= iov[which_iov].len) { offset -= iov[which_iov].len; ++which_iov; if (which_iov >= iovcnt) break; continue; } if (which_iov + 1 < iovcnt && offset + MAXIMUM_ALIGNOF > iov[which_iov].len) { char tmpbuf[MAXIMUM_ALIGNOF]; int j = 0; for (;;) { if (offset < iov[which_iov].len) { tmpbuf[j] = iov[which_iov].data[offset]; j++; offset++; if (j == MAXIMUM_ALIGNOF) break; } else { offset -= iov[which_iov].len; which_iov++; if (which_iov >= iovcnt) break; } } bytes_written = fwrite(tmpbuf, 1, j, mqh->output); mqh->mqh_partial_bytes += bytes_written; res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS; if (res != SHM_MQ_SUCCESS) return res; continue; } /* * If this is the last chunk, we can write all the data, even if it * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to * MAXALIGN_DOWN the write size. */ chunksize = iov[which_iov].len - offset; if (which_iov + 1 < iovcnt) chunksize = MAXALIGN_DOWN(chunksize); bytes_written = fwrite(&iov[which_iov].data[offset], 1, chunksize, mqh->output); mqh->mqh_partial_bytes += bytes_written; offset += bytes_written; res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS; if (res != SHM_MQ_SUCCESS) return res; } while (mqh->mqh_partial_bytes < nbytes); /* Reset for next message. */ mqh->mqh_partial_bytes = 0; mqh->mqh_length_word_complete = false; /* Notify receiver of the newly-written data, and return. */ return SHM_MQ_SUCCESS; }
signature.asc
Description: Digital signature