Hi, for quite some time, I've been under the impression, that there's still one disadvantage left from using processes instead of threads: we can only use statically sized chunks of shared memory. Every component that wants to use shared memory needs to pre-allocate whatever it thinks is sufficient. It cannot enlarge its share, nor can unused memory be allocated to other components.
Having written a very primitive kind of a dynamic memory allocator for imessages [1], I've always wanted a better alternative. So I've investigated a bit, refactored step-by-step, and finally came up with the attached, lock based dynamic shared memory allocator. Its interface is as simple as malloc() and free(). A restart of the postmaster should truncate the whole area. Being a component which needs to pre-allocate its area in shared memory in advance, you need to define a maximum size for the pool of dynamically allocatable memory. That's currently defined in shmem.h instead of a GUC. This kind of feature has been requested at the Tokyo Clusting Meeting (by myself) in 2009 and is listed on the Wiki [2]. I'm now using that allocator as the basis for a reworked imessages patch, which I've attached as well. Both are tested as a basis for Postgres-R. While I think other components could use this dynamic memory allocator, too, I didn't write any code for that. Imessages currently is the only user available. (So please apply the dynshmem patch first, then imessages). Comments? Greetings from Oxford, and thanks to Joachim Wieland for providing me the required Internet connectivity ;-) Markus Wanner [1]: Postgres-R: internal messages http://archives.postgresql.org/message-id/4886db0b.1090...@bluegap.ch [2]: Mentioned Cluster Feature http://wiki.postgresql.org/wiki/ClusterFeatures#Dynamic_shared_memory_allocation For git adicts: here's a git repository with both patches applied: http://git.postgres-r.org/?p=imessages;a=summary
*** src/backend/storage/ipc/wamalloc.c c21a430ced2cdc65a8d7e6e9243153b837e44ff6 --- src/backend/storage/ipc/wamalloc.c c21a430ced2cdc65a8d7e6e9243153b837e44ff6 *************** *** 1,0 **** --- 1,670 ---- + /*------------------------------------------------------------------------- + * + * wamalloc.c + * simple, lock-based dynamic memory allocator for shared memory. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ + + #include "postgres.h" + + #include "miscadmin.h" + #include "storage/pg_shmem.h" + #include "storage/shmem.h" + #include "storage/spin.h" + + + /* + * The number of sizeclasses and heaps (per sizeclass) to partition the + * shared memory area into. + */ + #define NUMBER_OF_SIZECLASSES 16 + #define NUMBER_OF_HEAPS 1 + + /* + * The number of bits of a pointer, which always remain zero due to the + * machine or implementation specific alignment. Higher numbers allow more + * bits to be used for credits, but waste more space. + */ + #define POINTER_ALIGNMENT 6 + + /* + * The number of bits to use for the count field. Again, plus the maximum + * value. It must be able to store the maximum number of blocks fitting in + * a superblock. + */ + #define BLOCK_COUNT_BITS 15 + #define MAX_BLOCK_COUNT 32768 + + /* + * Size of the superblocks to allocate, not including the superblock header + * of struct t_wam_superblock. + */ + #define SUPERBLOCK_SIZE 65536 + + /* + * Descriptor state constants + */ + #define STATE_EMPTY 0 + #define STATE_PARTIAL 2 + #define STATE_FULL 3 + + + struct t_wam_heap; + typedef volatile struct t_wam_heap* t_heap_ptr; + + #pragma pack(push) /* push current alignment to stack */ + #pragma pack(1) /* set alignment to 1 byte boundary */ + + typedef struct t_wam_anchor + { + union + { + struct + { + uint64_t avail:(62 - BLOCK_COUNT_BITS), + count:BLOCK_COUNT_BITS, + state:2; + }; + uint64_t value; + }; + } t_anchor; + + #pragma pack(pop) /* restore original alignment from stack */ + + struct t_wam_desc + { + volatile struct t_wam_desc *next; + #ifdef MAGIC + MAGIC_TYPE magic; + #endif + struct t_wam_anchor Anchor; + volatile void *sb; + t_heap_ptr heap; + unsigned int sz; + unsigned int maxcount; + slock_t DescLock; + }; + typedef volatile struct t_wam_desc* t_desc_ptr; + + struct t_wam_sizeclass + { + t_desc_ptr partial_desc_list; + unsigned int sz; + }; + typedef volatile struct t_wam_sizeclass* t_sizeclass_ptr; + + struct t_wam_superblock + { + volatile struct t_wam_superblock *next; + }; + typedef volatile struct t_wam_superblock* t_superblock_ptr; + + struct t_wam_heap + { + t_desc_ptr Active; + t_sizeclass_ptr sc; + slock_t HeapLock; + }; + + /* + * Global control structure, appears exactly once. + */ + struct t_wam_control + { + t_sizeclass_ptr sizeclasses; /* global sizeclasses */ + t_heap_ptr global_heap; /* multiple heaps per sizeclass */ + t_desc_ptr DescAvail; /* list of avail. descriptors */ + t_superblock_ptr SuperblockAvail; /* list of avail. superblocks */ + slock_t WamLock; /* big allocator lock */ + }; + typedef volatile struct t_wam_control* t_control_ptr; + + /* single global entry point(er) */ + static volatile struct t_wam_control *ctl; + + #define POINTER_MASK ((intptr_t) (-1) << POINTER_ALIGNMENT) + #define IS_ALIGNED(ptr) (((intptr_t) (ptr) & (~POINTER_MASK)) == 0) + #define ALIGN_POINTER(ptr) ((void*) (((intptr_t) ptr + ~POINTER_MASK) & POINTER_MASK)) + #define ALIGN_SIZE(i) (((intptr_t) i + ~POINTER_MASK) & POINTER_MASK) + #define USABLE_SUPERBLOCK_SIZE (SUPERBLOCK_SIZE - \ + sizeof(struct t_wam_superblock)) + + static t_heap_ptr find_heap(size_t request_size); + static t_superblock_ptr wam_alloc_sblock(void); + static void wam_free_sblock(t_superblock_ptr sb); + static volatile void* wam_desc_alloc(void); + + static void wam_partial_list_push(t_sizeclass_ptr sc, t_desc_ptr desc); + static t_desc_ptr wam_partial_list_pop(t_sizeclass_ptr sc); + + static void wam_desc_free(volatile void *ptr); + static void* wam_alloc_from_partial(t_heap_ptr heap); + static void* wam_alloc_from_sblock(t_heap_ptr heap); + + + static t_heap_ptr + find_heap(size_t request_size) + { + int sizeclass_no; + int heap_no; + + /* + * We require an additional header to point to the descriptor. Possibly + * add 64bit for a magic value to be checked. + */ + request_size += sizeof(t_desc_ptr); + + /* + * FIXME: there certainly are more efficient ways to find the target + * sizeclass. + */ + sizeclass_no = 0; + while (ctl->sizeclasses[sizeclass_no].sz < request_size) + sizeclass_no++; + Assert(sizeclass_no < NUMBER_OF_SIZECLASSES); + + heap_no = MyProcPid % NUMBER_OF_HEAPS; + + return &ctl->global_heap[sizeclass_no * NUMBER_OF_HEAPS + heap_no]; + } + + static t_superblock_ptr + wam_alloc_sblock(void) + { + t_superblock_ptr sb; + + sb = ctl->SuperblockAvail; + if (sb) + ctl->SuperblockAvail = sb->next; + + return sb; + } + + static void + wam_free_sblock(t_superblock_ptr sb) + { + sb->next = ctl->SuperblockAvail; + ctl->SuperblockAvail = sb; + } + + static volatile void* + wam_desc_alloc(void) + { + t_desc_ptr desc, next, this; + int i, max, dsize; + + desc = (t_desc_ptr) ctl->DescAvail; + + if (!desc) + { + desc = (t_desc_ptr) wam_alloc_sblock(); + + if (desc) + { + dsize = ALIGN_SIZE(sizeof(struct t_wam_desc)); + max = USABLE_SUPERBLOCK_SIZE / dsize; + + this = (t_desc_ptr) (((char*) desc) + dsize * (max - 1)); + this->next = NULL; + #ifdef MAGIC + this->magic = MAGIC_VALUE; + #endif + for (i = max - 2; i >= 0; i--) + { + this = (t_desc_ptr) (((char*) desc) + dsize * (i)); + next = (t_desc_ptr) (((char*) desc) + dsize * (i+1)); + + this->next = next; + #ifdef MAGIC + this->magic = MAGIC_VALUE; + #endif + } + + } + } + + if (desc) + ctl->DescAvail = desc->next; + + return (void*) desc; + } + + static void + wam_partial_list_push(t_sizeclass_ptr sc, t_desc_ptr desc) + { + #ifdef MAGIC + Assert(desc->magic == MAGIC_VALUE); + #endif + + desc->next = sc->partial_desc_list; + sc->partial_desc_list = desc; + } + + static t_desc_ptr + wam_partial_list_pop(t_sizeclass_ptr sc) + { + t_desc_ptr desc; + + desc = (t_desc_ptr) sc->partial_desc_list; + + #ifdef MAGIC + if (desc) + Assert(desc->magic == MAGIC_VALUE); + #endif + + if (desc) + sc->partial_desc_list = desc->next; + + return desc; + } + + static void + wam_desc_free(volatile void *ptr) + { + t_desc_ptr desc = ptr; + + #ifdef MAGIC + Assert(desc->magic == MAGIC_VALUE); + #endif + + desc->next = ctl->DescAvail; + ctl->DescAvail = desc; + } + + static void* + wam_alloc_from_partial(const t_heap_ptr heap) + { + t_desc_ptr desc; + t_anchor oldanchor; + intptr_t *addr; + + SpinLockAcquire(&heap->HeapLock); + + desc = heap->Active; + + retry: + if (!desc) + { + SpinLockRelease(&heap->HeapLock); + return NULL; + } + + SpinLockAcquire(&desc->DescLock); + + #ifdef MAGIC + Assert(desc->magic == MAGIC_VALUE); + #endif + + if (desc->Anchor.state == STATE_FULL) + { + SpinLockAcquire(&ctl->WamLock); + SpinLockRelease(&desc->DescLock); + + desc = wam_partial_list_pop(heap->sc); + heap->Active = desc; + + SpinLockRelease(&ctl->WamLock); + + goto retry; + } + else if (desc->Anchor.state == STATE_EMPTY) + { + SpinLockAcquire(&ctl->WamLock); + SpinLockRelease(&desc->DescLock); + + wam_desc_free(desc); + desc = wam_partial_list_pop(heap->sc); + heap->Active = desc; + + SpinLockRelease(&ctl->WamLock); + + goto retry; + } + + /* release the heap lock as early as possible */ + SpinLockRelease(&heap->HeapLock); + + desc->heap = heap; + + oldanchor.value = desc->Anchor.value; + + Assert(desc->Anchor.state == STATE_PARTIAL); + Assert(desc->Anchor.count > 0); + + desc->Anchor.count -= 1; + desc->Anchor.state = (desc->Anchor.count > 0 ? + STATE_PARTIAL : STATE_FULL); + + Assert(desc->Anchor.avail < desc->maxcount); + addr = (intptr_t*) ((Pointer) desc->sb + + desc->Anchor.avail * desc->sz); + Assert(*addr >= 0); + Assert(*addr < desc->maxcount); + + desc->Anchor.avail = *addr; + Assert(desc->Anchor.avail < desc->maxcount); + + SpinLockRelease(&desc->DescLock); + + *addr = (intptr_t) desc; + + return addr + 1; + } + + static void* + wam_alloc_from_sblock(const t_heap_ptr heap) + { + t_desc_ptr desc; + unsigned int i; + intptr_t *addr = NULL; + + SpinLockAcquire(&heap->HeapLock); + SpinLockAcquire(&ctl->WamLock); + + desc = wam_desc_alloc(); + if (desc) + { + #ifdef MAGIC + Assert(desc->magic == MAGIC_VALUE); + #endif + + desc->heap = heap; + Assert(IS_ALIGNED(heap->sc->sz)); + desc->sz = heap->sc->sz; + desc->maxcount = USABLE_SUPERBLOCK_SIZE / desc->sz; + Assert(desc->maxcount < MAX_BLOCK_COUNT); + Assert(desc->maxcount > 2); + + desc->Anchor.avail = 1; + desc->Anchor.count = desc->maxcount - 1; + + desc->Anchor.state = STATE_PARTIAL; + + /* allocate a superblock and initialize it */ + desc->sb = wam_alloc_sblock(); + + if (desc->sb) + { + /* organize blocks in a linked list starting with index 0 */ + i = 0; + addr = (intptr_t*) desc->sb; + while (1) + { + i++; + if (i < desc->maxcount) + { + *addr = i; + addr += desc->sz / sizeof(Pointer); + } + else + { + *addr = 0; + break; + } + } + + addr = (intptr_t*) desc->sb; + *addr = (intptr_t) desc; + + if (heap->Active) + wam_partial_list_push(heap->sc, desc); + else + heap->Active = desc; + + addr += 1; + } + else + wam_desc_free(desc); + } + + SpinLockRelease(&ctl->WamLock); + SpinLockRelease(&heap->HeapLock); + + return addr; + } + + void* + wam_alloc(size_t sz) + { + t_heap_ptr heap; + Pointer addr = NULL; + + #ifdef MAGIC + sz += 2 * sizeof(MAGIC_TYPE); + #endif + + heap = find_heap(sz); + Assert(heap); + + START_CRIT_SECTION(); + { + addr = wam_alloc_from_partial(heap); + if (!addr) + addr = wam_alloc_from_sblock(heap); + } + END_CRIT_SECTION(); + + #ifdef MAGIC + *((MAGIC_TYPE*) addr) = MAGIC_VALUE; + *((MAGIC_TYPE*) (addr + heap->sc->sz - sizeof(Pointer) + - sizeof(MAGIC_TYPE))) = MAGIC_VALUE; + addr += sizeof(MAGIC_TYPE); + #endif + + if (!addr) + elog(WARNING, "out of dynamic shared memory!"); + + return addr; + } + + void + wam_free(void* ptr) + { + t_desc_ptr desc; + t_heap_ptr heap = NULL; + t_anchor anchor, oldanchor; + t_superblock_ptr freeable_sb = NULL; + Pointer addr; + + Assert(ptr); + + addr = (Pointer) ptr; + + #ifdef MAGIC + addr -= sizeof(MAGIC_TYPE); + Assert(*((MAGIC_TYPE*) addr) == MAGIC_VALUE); + #endif + + addr -= sizeof(Pointer); + desc = *((t_desc_ptr*) addr); + + Assert(desc->sb); + Assert((intptr_t) desc > 0xff); + + START_CRIT_SECTION(); + { + SpinLockAcquire(&desc->DescLock); + + #ifdef MAGIC + Assert(desc->magic == MAGIC_VALUE); + Assert(*((MAGIC_TYPE*)(addr + desc->heap->sc->sz + - sizeof(MAGIC_TYPE))) == MAGIC_VALUE); + #endif + + Assert(desc->Anchor.count < desc->maxcount); + + *((intptr_t*) addr) = desc->Anchor.avail; + oldanchor.value = desc->Anchor.value; + if (desc->Anchor.count == desc->maxcount - 1) + { + Assert(desc->Anchor.state == STATE_PARTIAL); + + desc->Anchor.avail = 0; /* just here for the compiler */ + desc->Anchor.count = 0; /* just here for the compiler */ + desc->Anchor.state = STATE_EMPTY; + + freeable_sb = desc->sb; + desc->sb = NULL; + } + else + { + heap = desc->heap; + + desc->Anchor.avail = (addr - (Pointer) desc->sb) / desc->sz; + Assert(desc->Anchor.avail < desc->maxcount); + desc->Anchor.count++; + Assert(desc->Anchor.count < desc->maxcount); + desc->Anchor.state = STATE_PARTIAL; + } + + anchor.value = desc->Anchor.value; + + SpinLockRelease(&desc->DescLock); + + if (freeable_sb) + { + Assert(anchor.state == STATE_EMPTY); + + SpinLockAcquire(&ctl->WamLock); + wam_free_sblock(freeable_sb); + SpinLockRelease(&ctl->WamLock); + } + else if (oldanchor.state == STATE_FULL) + { + Assert(heap != NULL); + + SpinLockAcquire(&ctl->WamLock); + wam_partial_list_push(heap->sc, desc); + SpinLockRelease(&ctl->WamLock); + } + } + END_CRIT_SECTION(); + } + + void + wam_check(void *ptr) + { + Pointer addr; + t_desc_ptr desc; + + Assert(ptr); + + addr = (Pointer) ptr; + + #ifdef MAGIC + addr -= sizeof(MAGIC_TYPE); + Assert(*((MAGIC_TYPE*) addr) == MAGIC_VALUE); + #endif + + addr -= sizeof(Pointer); + desc = *((t_desc_ptr*) addr); + + Assert((intptr_t) desc > 0xff); + + #ifdef MAGIC + Assert(*((MAGIC_TYPE*)(addr + desc->heap->sc->sz + - sizeof(MAGIC_TYPE))) == MAGIC_VALUE); + Assert(desc->magic == MAGIC_VALUE); + #endif + + Assert(desc->Anchor.avail < desc->maxcount); + Assert(desc->Anchor.count < desc->maxcount); + } + + int + wam_size(void) + { + /* FIXME: correct size calculation */ + return MAXALIGN(sizeof(struct t_wam_control) + + sizeof(struct t_wam_sizeclass) * NUMBER_OF_SIZECLASSES + + sizeof(struct t_wam_heap) * NUMBER_OF_HEAPS * NUMBER_OF_SIZECLASSES + + 4 * POINTER_MASK + + ShmemDynBufferSize); + } + + void + wam_init() + { + Pointer ptr, shmem; + t_superblock_ptr sb; + int i, j, size; + bool found; + + size = wam_size(); + ptr = shmem = ShmemInitStruct("ShmemDynAllocCtl", size, &found); + + if (found) + return; + + ctl = (t_control_ptr) ptr; + ptr += sizeof(struct t_wam_control); + ptr = ALIGN_POINTER(ptr); + + /* + * Create a useful set of sizeclasses. + */ + ctl->sizeclasses = (t_sizeclass_ptr) ptr; + ctl->sizeclasses[0].partial_desc_list = NULL; + ctl->sizeclasses[0].sz = 1 << POINTER_ALIGNMENT; + for (i = 1; i < NUMBER_OF_SIZECLASSES; i++) + { + ctl->sizeclasses[i].partial_desc_list = NULL; + ctl->sizeclasses[i].sz = ctl->sizeclasses[i-1].sz * 2; + + Assert(IS_ALIGNED(ctl->sizeclasses[i].sz)); + + Assert(USABLE_SUPERBLOCK_SIZE / ctl->sizeclasses[i].sz < + MAX_BLOCK_COUNT); + } + ptr += sizeof(struct t_wam_sizeclass) * NUMBER_OF_SIZECLASSES; + ptr = ALIGN_POINTER(ptr); + + /* + * Initialize the global heap control structures + */ + ctl->global_heap = (t_heap_ptr) ptr; + for (i = 0; i < NUMBER_OF_SIZECLASSES; i++) + { + for (j = 0; j < NUMBER_OF_HEAPS; j++) + { + t_heap_ptr heap = &ctl->global_heap[i * NUMBER_OF_HEAPS + j]; + + SpinLockInit(&heap->HeapLock); + heap->Active = NULL; + heap->sc = &ctl->sizeclasses[i]; + } + } + ptr += sizeof(struct t_wam_heap) * + NUMBER_OF_HEAPS * NUMBER_OF_SIZECLASSES; + ptr = ALIGN_POINTER(ptr); + + /* + * Initialize the stack for parking available descriptors. + */ + ctl->DescAvail = NULL; + SpinLockInit(&ctl->WamLock); + + /* + * Organize the remaining space of shared memory into a singly linked + * lits of superblocks of size SUPERBLOCK_SIZE. + */ + ptr = ALIGN_POINTER(ptr); + + ctl->SuperblockAvail = NULL; + while ((ptr - shmem) < (size - SUPERBLOCK_SIZE)) + { + sb = (t_superblock_ptr) ptr; + ptr += SUPERBLOCK_SIZE; + + Assert(IS_ALIGNED(ptr)); + + /* push onto the stack */ + sb->next = ctl->SuperblockAvail; + ctl->SuperblockAvail = sb; + + i++; + } + } ============================================================ *** src/backend/storage/ipc/Makefile 6c6ff1afae22b8de6ffc2d092407478e66de6ff7 --- src/backend/storage/ipc/Makefile 0610a969a64989edaa2c19f9c25362bd888bdeb9 *************** OBJS = ipc.o ipci.o pmsignal.o procarray *** 16,21 **** endif OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \ ! sinval.o sinvaladt.o standby.o include $(top_srcdir)/src/backend/common.mk --- 16,21 ---- endif OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \ ! sinval.o sinvaladt.o standby.o wamalloc.o include $(top_srcdir)/src/backend/common.mk ============================================================ *** src/backend/storage/ipc/ipci.c e342a127d85e601576379a4f58fb62c1fc23d97c --- src/backend/storage/ipc/ipci.c 2c2131711f38125a39ec0c9dfaef970ca873d142 *************** CreateSharedMemoryAndSemaphores(bool mak *** 104,109 **** --- 104,110 ---- size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt))); size = add_size(size, BufferShmemSize()); + size = add_size(size, ShmemDynAllocSize()); size = add_size(size, LockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); *************** CreateSharedMemoryAndSemaphores(bool mak *** 185,190 **** --- 186,196 ---- InitShmemIndex(); /* + * Set up dynamically allocatable area in shared memory. + */ + ShmemDynAllocInit(); + + /* * Set up xlog, clog, and buffers */ XLOGShmemInit(); ============================================================ *** src/backend/storage/ipc/shmem.c bf9711329c544168e6b38dcf18a4e15a9a727b5c --- src/backend/storage/ipc/shmem.c 111243814fbf7d29e7f0ed36f031577ff101bba9 *************** InitShmemAllocation(void) *** 140,145 **** --- 140,161 ---- } /* + * Initialization of shared memory for dynamically allocated shared memory. + */ + int + ShmemDynAllocSize(void) + { + return wam_size(); + } + + void + ShmemDynAllocInit(void) + { + wam_init(); + } + + + /* * ShmemAlloc -- allocate max-aligned chunk from shared memory * * Assumes ShmemLock and ShmemSegHdr are initialized. *************** ShmemInitStruct(const char *name, Size s *** 411,416 **** --- 427,452 ---- } + void * + ShmemDynAlloc(size_t size) + { + return wam_alloc(size); + } + + void + ShmemDynFree(void *ptr) + { + wam_free(ptr); + } + + #ifdef MAGIC + void + ShmemDynCheck(void *ptr) + { + wam_check(ptr); + } + #endif + /* * Add two Size values, checking for overflow */ ============================================================ *** src/backend/storage/ipc/shmqueue.c d01f0bbb903a14e7fd4d2a5ec65985b32cad3718 --- src/backend/storage/ipc/shmqueue.c 2fbe744d1fc4b2a45f1eaa1704276a23edd3f542 *************** *** 26,31 **** --- 26,32 ---- #include "postgres.h" #include "storage/shmem.h" + #include "storage/spin.h" /* *************** SHMQueueEmpty(SHM_QUEUE *queue) *** 173,175 **** --- 174,241 ---- } return FALSE; } + + + void + deque_init(Deque *list) + { + DequeElem *tail; + + SpinLockInit(&list->lock); + + list->head.next = &list->head; + list->tail = &list->head; + + tail = list->tail; + Assert(tail->next == &list->head); + } + + void + deque_enqueue(Deque *list, DequeElem *elem) + { + DequeElem *tail; + + Assert(ShmemAddrIsValid(elem)); + Assert(list); + Assert(list->tail); + + tail = list->tail; + + Assert(ShmemAddrIsValid(tail)); + Assert(tail->next == &list->head); + + elem->next = &list->head; + tail->next = elem; + list->tail = elem; + + tail = list->tail; + Assert(tail->next == &list->head); + } + + DequeElem * + deque_dequeue(Deque *list) + { + DequeElem *tail; + DequeElem *elem = NULL; + + Assert(list); + Assert(list->tail); + + tail = list->tail; + Assert(tail->next == &list->head); + + if (list->head.next != &list->head) + { + elem = list->head.next; + list->head.next = elem->next; + + /* hm.. in a lock-free implementation, updating the tail pointer + here could pose a problem: if an enqueue adds an element + *before* we update the tail pointer, we miss that one. */ + + if (list->head.next == &list->head) + list->tail = &list->head; + } + + return elem; + } ============================================================ *** src/include/storage/shmem.h 3582e34b3882aaf554ca311f1d28fbe03775de6c --- src/include/storage/shmem.h 068b4aff617ae86bce155915e1a8137cb16337c0 *************** *** 21,29 **** --- 21,42 ---- #ifndef SHMEM_H #define SHMEM_H + #include <stddef.h> #include "utils/hsearch.h" + #include "storage/s_lock.h" + /* + * For debug builds with assertions enabled we also enable magic value + * checking to help detect problems in dynamically allocated shared memory + * areas, because buffer overflows there can hurt very badly. + */ + #ifdef USE_ASSERT_CHECKING + #define MAGIC + typedef uint64_t MAGIC_TYPE; + #define MAGIC_VALUE 0x52b32fa9048c71deLL + #endif + /* shmqueue.c */ typedef struct SHM_QUEUE { *************** typedef struct SHM_QUEUE *** 31,37 **** --- 44,84 ---- struct SHM_QUEUE *next; } SHM_QUEUE; + typedef struct DequeElem + { + struct DequeElem *next; + } DequeElem; + + typedef struct Deque + { + DequeElem head; + DequeElem *tail; + slock_t lock; + } Deque; + + extern void deque_init(Deque *list); + extern void deque_enqueue(Deque *list, DequeElem *elem); + extern DequeElem *deque_dequeue(Deque *list); + + /* primalloc.c */ + extern int prim_size(void); + extern void prim_init(void); + extern void* prim_alloc(size_t sz); + extern void prim_free(void *ptr); + extern void prim_check(void *ptr); + + /* wamalloc.c */ + extern int wam_size(void); + extern void wam_init(void); + extern void* wam_alloc(size_t sz); + extern void wam_free(void *ptr); + extern void wam_check(void *ptr); + /* shmem.c */ + + /* TODO: replace with GUC variable to be configurable */ + #define ShmemDynBufferSize 2097152 /* 2 MiB */ + extern void InitShmemAccess(void *seghdr); extern void InitShmemAllocation(void); extern void *ShmemAlloc(Size size); *************** extern Size mul_size(Size s1, Size s2); *** 43,48 **** --- 90,103 ---- extern Size add_size(Size s1, Size s2); extern Size mul_size(Size s1, Size s2); + extern int ShmemDynAllocSize(void); + extern void ShmemDynAllocInit(void); + extern void *ShmemDynAlloc(size_t size); + extern void ShmemDynFree(void *ptr); + #ifdef MAGIC + extern void ShmemDynCheck(void *ptr); + #endif + /* ipci.c */ extern void RequestAddinShmemSpace(Size size);
*** src/backend/storage/ipc/imsg.c dc149eef487eafb43409a78b8a33c70e7d3c2bfa --- src/backend/storage/ipc/imsg.c dc149eef487eafb43409a78b8a33c70e7d3c2bfa *************** *** 1,0 **** --- 1,275 ---- + /*------------------------------------------------------------------------- + * + * imsg.c + * + * Internal message passing for process to process communication + * via shared memory. + * + * Copyright (c) 2005-2010, Markus Wanner + * + *------------------------------------------------------------------------- + */ + + #include <unistd.h> + #include <signal.h> + #include <string.h> + + #ifdef HAVE_SYS_FILIO_H + #include <sys/filio.h> + #endif + + #include <sys/ioctl.h> + + #include "postgres.h" + #include "miscadmin.h" + #include "storage/proc.h" + #include "storage/procsignal.h" + #include "storage/imsg.h" + #include "storage/ipc.h" + #include "storage/shmem.h" + #include "storage/spin.h" + #include "utils/elog.h" + + /* global variable pointing to the shmem area */ + IMessageCtlData *IMessageCtl = NULL; + + /* + * flag set by the signal handler, initialized to true to ensure + * IMessageCheck is called at least once after process startup. + */ + static bool got_IMessage = false; + + + /* + * Initialization of shared memory for internal messages. + */ + int + IMessageShmemSize(void) + { + return MAXALIGN(sizeof(IMessageCtlData) + + sizeof(Deque) * (MaxBackends - 1)); + } + + void + IMessageShmemInit(void) + { + bool foundIMessageCtl; + int i; + + #ifdef IMSG_DEBUG + elog(DEBUG3, "IMessageShmemInit(): initializing shared memory"); + #endif + + IMessageCtl = (IMessageCtlData *) + ShmemInitStruct("IMsgCtl", IMessageShmemSize(), + &foundIMessageCtl); + + if (foundIMessageCtl) + return; + + /* initialize the per-backend message sink */ + for (i = 0; i < MaxBackends; i++) + deque_init(&IMessageCtl->lists[i]); + } + + char * + decode_imessage_type(const IMessageType msg_type) + { + switch (msg_type) + { + case IMSGT_TEST: + return "IMSGT_TEST"; + default: + return "unknown message type"; + } + } + + /* + * IMessageCreateInternal + * + * Creates a new but deactivated message within the queue, returning the + * message header of the newly created message or NULL if there's not enough + * space in shared memory for a message of the requested size. + */ + static IMessage* + IMessageCreateInternal(IMessageType type, int msg_size) + { + IMessage *msg; + + msg = (IMessage*) ShmemDynAlloc(sizeof(IMessage) + msg_size); + + if (msg) + { + #ifdef MAGIC + msg->magic = MAGIC_VALUE; + ShmemDynCheck(msg); + #endif + + msg->sender = InvalidBackendId; + msg->type = type; + msg->size = msg_size; + + #ifdef IMSG_DEBUG + elog(DEBUG3, "IMessageCreateInternal(): created message type %s of size %d at %p", + decode_imessage_type(type), (int) msg_size, msg); + #endif + } + + return msg; + } + + /* + * IMessageCreate + * + * Creates a new but deactivated message within the queue, returning the + * message header of the newly created message. Blocks until there is + * enough space available for the message in shared memory, retrying every + * 100ms. + * + * FIXME: this is not the best way to handle out of (imessage) memory, as + * the process wanting to create an IMessage may well receive more + * imessages while it is waiting to create a new one (previously + * created, but not activated ones). However, for most callers this + * would mean having to cache the message in local memory until its + * deliverable. + */ + IMessage* + IMessageCreate(IMessageType type, int msg_size) + { + IMessage *msg; + + while (!(msg = IMessageCreateInternal(type, msg_size))) + { + elog(WARNING, "imessage: waiting for %d bytes to be freed", + (int) sizeof(IMessage) + msg_size); + + pg_usleep(100000); + } + + return msg; + } + + int + IMessageActivate(IMessage *msg, BackendId recipient) + { + Assert(msg); + Assert(recipient >= 0); + Assert(recipient < MaxBackends); + #ifdef MAGIC + Assert(msg->magic == MAGIC_VALUE); + ShmemDynCheck(msg); + #endif + + #ifdef IMSG_DEBUG + elog(DEBUG3, "IMessageActivate(): activating message of type %s and size %d for recipient %d", + decode_imessage_type(msg->type), msg->size, recipient); + #endif + + START_CRIT_SECTION(); + { + /* use volatile pointer to prevent code rearrangement */ + IMessageCtlData *imsgctl = IMessageCtl; + Deque *list = &imsgctl->lists[recipient]; + + SpinLockAcquire(&list->lock); + + msg->sender = MyBackendId; + + deque_enqueue(list, &msg->list); + + SpinLockRelease(&list->lock); + } + END_CRIT_SECTION(); + + return SendProcSignalById(PROCSIG_IMSG_INTERRUPT, recipient); + } + + /* + * IMessageRemove + * + * Marks a message as removable by setting the recipient to null. The message + * will eventually be removed during creation of new messages, see + * IMessageCreate(). + */ + void + IMessageRemove(IMessage *msg) + { + Assert(msg); + #ifdef MAGIC + Assert(msg->magic == MAGIC_VALUE); + #endif + + ShmemDynFree((Pointer) msg); + } + + /* + * HandleIMessageInterrupt + * + * Called on PROCSIG_IMSGT_INTERRUPT, possibly within the signal handler. + */ + void + HandleIMessageInterrupt() + { + got_IMessage = true; + } + + /* + * IMessageCheck + * + * Checks if there is a message in the queue for this process. Returns null + * if there is no message for this process, the message header otherwise. The + * message remains in the queue and should be removed by IMessageRemove(). + */ + IMessage* + IMessageCheck(void) + { + IMessage *msg; + + /* short circuit in case we didn't receive a signal */ + if (!got_IMessage) + return NULL; + + #ifdef IMSG_DEBUG + elog(DEBUG3, "IMessageCheck(): backend %d (pid %d) got imsg interrupt", + MyBackendId, MyProc->pid); + #endif + + msg = NULL; + START_CRIT_SECTION(); + { + /* use volatile pointer to prevent code rearrangement */ + IMessageCtlData *imsgctl = IMessageCtl; + Deque *list = &imsgctl->lists[MyBackendId]; + + SpinLockAcquire(&list->lock); + + msg = (IMessage*) deque_dequeue(list); + + SpinLockRelease(&list->lock); + } + END_CRIT_SECTION(); + + /* + * Reset the flag, if we scanned through the list but didn't find any + * new message. + */ + if (msg == NULL) + got_IMessage = false; + + #ifdef MAGIC + if (msg != NULL) + { + Assert(msg->magic == MAGIC_VALUE); + ShmemDynCheck(msg); + } + #endif + + #ifdef IMSG_DEBUG + if (msg != NULL) + elog(DEBUG3, "IMessageCheck(): new message at %p of type %s and size %d for [%d/%d]", + msg, decode_imessage_type(msg->type), msg->size, + MyBackendId, MyProcPid); + #endif + + return msg; + } ============================================================ *** src/include/storage/imsg.h d7d3960f42396bef49eb598411c215271570330e --- src/include/storage/imsg.h d7d3960f42396bef49eb598411c215271570330e *************** *** 1,0 **** --- 1,76 ---- + /*------------------------------------------------------------------------- + * + * imsg.h + * + * Internal message passing for process to process communication + * via shared memory. + * + * Copyright (c) 2005-2010, Markus Wanner + * + *------------------------------------------------------------------------- + */ + + #ifndef IMSG_H + #define IMSG_H + + #include <sys/types.h> + + #include "c.h" + #include "storage/backendid.h" + #include "storage/shmem.h" + #include "storage/spin.h" + + /* get a data pointer from the header */ + #define IMSG_DATA(imsg) ((Pointer) ((Pointer) imsg + sizeof(IMessage))) + + /* + * Message types + */ + typedef enum + { + IMSGT_TEST = 'T' /* test message type */ + } IMessageType; + + /* + * Message descriptor in front of the message + */ + typedef struct + { + DequeElem list; + + #ifdef MAGIC + uint64 magic; + #endif + + /* backend id of the sender, null means not yet activated message */ + BackendId sender; + + /* message type */ + IMessageType type; + + /* message size following, but not including this header */ + int size; + } IMessage; + + /* + * shared-memory pool for internal messages. + */ + typedef struct + { + /* a singly linked list per backend */ + Deque lists[1]; + } IMessageCtlData; + + /* routines to send and receive internal messages */ + extern int IMessageShmemSize(void); + extern void IMessageShmemInit(void); + extern IMessage* IMessageCreate(IMessageType type, int msg_size); + extern int IMessageActivate(IMessage *msg, BackendId recipient); + extern void IMessageRemove(IMessage *msg); + extern void HandleIMessageInterrupt(void); + extern IMessage* IMessageCheck(void); + + /* mainly for debugging purposes */ + extern char* decode_imessage_type(const IMessageType msg_type); + + #endif /* IMSG_H */ ============================================================ *** src/backend/storage/ipc/Makefile 0610a969a64989edaa2c19f9c25362bd888bdeb9 --- src/backend/storage/ipc/Makefile c1e3d9da5c3fa8e27f25f03c0cdd5b8149fb5a4c *************** endif *** 15,21 **** endif endif ! OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \ ! sinval.o sinvaladt.o standby.o wamalloc.o include $(top_srcdir)/src/backend/common.mk --- 15,21 ---- endif endif ! OBJS = imsg.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o \ ! shmqueue.o sinval.o sinvaladt.o standby.o wamalloc.o include $(top_srcdir)/src/backend/common.mk ============================================================ *** src/backend/storage/ipc/ipci.c 2c2131711f38125a39ec0c9dfaef970ca873d142 --- src/backend/storage/ipc/ipci.c 1e5026e411d3728825b8e18dc01a9a0ec9ce974d *************** *** 29,34 **** --- 29,35 ---- #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" + #include "storage/imsg.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" #include "storage/pmsignal.h" *************** CreateSharedMemoryAndSemaphores(bool mak *** 125,130 **** --- 126,132 ---- size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, IMessageShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif *************** CreateSharedMemoryAndSemaphores(bool mak *** 234,239 **** --- 236,242 ---- BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + IMessageShmemInit(); #ifdef EXEC_BACKEND ============================================================ *** src/backend/storage/ipc/procsignal.c 0d72e226ad1e335340a408bdf250c871ee963e7e --- src/backend/storage/ipc/procsignal.c a86dc97e08f3efa9f69dda1d6815aff0156bf0dc *************** *** 20,25 **** --- 20,26 ---- #include "bootstrap/bootstrap.h" #include "commands/async.h" #include "miscadmin.h" + #include "storage/imsg.h" #include "storage/ipc.h" #include "storage/procsignal.h" #include "storage/shmem.h" *************** SendProcSignal(pid_t pid, ProcSignalReas *** 224,229 **** --- 225,253 ---- } /* + * SendProcSignalById + * Send a signal to a Postgres process identified by its backend id. + * + * This variation of SendProcSignal doesn't check the process id, thus it's + * chance of signaling the wrong backend is even higher. Users need to cope + * with signals erroneously delivered to a newly started backend. + */ + int + SendProcSignalById(ProcSignalReason reason, BackendId backendId) + { + volatile ProcSignalSlot *slot; + + Assert(backendId != InvalidBackendId); + + slot = &ProcSignalSlots[backendId - 1]; + + /* Atomically set the proper flag */ + slot->pss_signalFlags[reason] = true; + /* Send signal */ + return kill(slot->pss_pid, SIGUSR1); + } + + /* * CheckProcSignal - check to see if a particular reason has been * signaled, and clear the signal flag. Should be called after receiving * SIGUSR1. *************** procsignal_sigusr1_handler(SIGNAL_ARGS) *** 278,282 **** --- 302,309 ---- if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_IMSG_INTERRUPT)) + HandleIMessageInterrupt(); + errno = save_errno; } ============================================================ *** src/include/storage/procsignal.h 371d9c8b77490ffca09bd5227e658af452dae5b4 --- src/include/storage/procsignal.h dcecd1a312db4fe0aff6cb6b5037e109677bfc8b *************** typedef enum *** 31,36 **** --- 31,37 ---- { PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ + PROCSIG_IMSG_INTERRUPT, /* internal message received */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE, *************** extern int SendProcSignal(pid_t pid, Pro *** 52,57 **** --- 53,59 ---- extern void ProcSignalInit(int pss_idx); extern int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId); + extern int SendProcSignalById(ProcSignalReason reason, BackendId backendId); extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers