On 3/19/19 4:44 PM, Chris Travers wrote: > > > On Tue, Mar 19, 2019 at 12:19 PM Tomas Vondra > <tomas.von...@2ndquadrant.com <mailto:tomas.von...@2ndquadrant.com>> wrote: > > > On 3/19/19 10:59 AM, Chris Travers wrote: > > > > > > Not discussing whether any particular committer should pick this > up but > > I want to discuss an important use case we have at Adjust for this > sort > > of patch. > > > > The PostgreSQL compression strategy is something we find > inadequate for > > at least one of our large deployments (a large debug log spanning > > 10PB+). Our current solution is to set storage so that it does not > > compress and then run on ZFS to get compression speedups on > spinning disks. > > > > But running PostgreSQL on ZFS has some annoying costs because we have > > copy-on-write on copy-on-write, and when you add file > fragmentation... I > > would really like to be able to get away from having to do ZFS as an > > underlying filesystem. While we have good write throughput, read > > throughput is not as good as I would like. > > > > An approach that would give us better row-level compression would > allow > > us to ditch the COW filesystem under PostgreSQL approach. > > > > So I think the benefits are actually quite high particularly for those > > dealing with volume/variety problems where things like JSONB might > be a > > go-to solution. Similarly I could totally see having systems which > > handle large amounts of specialized text having extensions for dealing > > with these. > > > > Sure, I don't disagree - the proposed compression approach may be a big > win for some deployments further down the road, no doubt about it. But > as I said, it's unclear when we get there (or if the interesting stuff > will be in some sort of extension, which I don't oppose in principle). > > > I would assume that if extensions are particularly stable and useful > they could be moved into core. > > But I would also assume that at first, this area would be sufficiently > experimental that folks (like us) would write our own extensions for it. > > > > > > > But hey, I think there are committers working for postgrespro, > who might > > have the motivation to get this over the line. Of course, > assuming that > > there are no serious objections to having this functionality > or how it's > > implemented ... But I don't think that was the case. > > > > > > While I am not currently able to speak for questions of how it is > > implemented, I can say with very little doubt that we would almost > > certainly use this functionality if it were there and I could see > plenty > > of other cases where this would be a very appropriate direction > for some > > other projects as well. > > > Well, I guess the best thing you can do to move this patch forward is to > actually try that on your real-world use case, and report your results > and possibly do a review of the patch. > > > Yeah, I expect to do this within the next month or two. > > > > IIRC there was an extension [1] leveraging this custom compression > interface for better jsonb compression, so perhaps that would work for > you (not sure if it's up to date with the current patch, though). > > [1] > > https://www.postgresql.org/message-id/20171130182009.1b492eb2%40wp.localdomain > > Yeah I will be looking at a couple different approaches here and > reporting back. I don't expect it will be a full production workload but > I do expect to be able to report on benchmarks in both storage and > performance. >
FWIW I was a bit curious how would that jsonb compression affect the data set I'm using for testing jsonpath patches, so I spent a bit of time getting it to work with master. It attached patch gets it to compile, but unfortunately then it fails like this: ERROR: jsonbd: worker has detached It seems there's some bug in how sh_mq is used, but I don't have time investigate that further. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/jsonbd.c b/jsonbd.c index 511cfcf..9b86e1f 100644 --- a/jsonbd.c +++ b/jsonbd.c @@ -766,11 +766,6 @@ jsonbd_cminitstate(Oid acoid, List *options) return NULL; } -static void -jsonbd_cmdrop(Oid acoid) -{ - /* TODO: if there is no compression options, remove the dictionary */ -} static struct varlena * jsonbd_cmdecompress(CompressionAmOptions *cmoptions, const struct varlena *data) @@ -863,7 +858,6 @@ jsonbd_compression_handler(PG_FUNCTION_ARGS) CompressionAmRoutine *routine = makeNode(CompressionAmRoutine); routine->cmcheck = jsonbd_cmcheck; - routine->cmdrop = jsonbd_cmdrop; /* no drop behavior */ routine->cminitstate = jsonbd_cminitstate; routine->cmcompress = jsonbd_cmcompress; routine->cmdecompress = jsonbd_cmdecompress; diff --git a/jsonbd_utils.c b/jsonbd_utils.c index 3ab9347..029e2e3 100644 --- a/jsonbd_utils.c +++ b/jsonbd_utils.c @@ -2,9 +2,11 @@ #include "jsonbd_utils.h" #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" #include "access/xact.h" #include "access/sysattr.h" +#include "access/table.h" #include "catalog/indexing.h" #include "catalog/pg_extension.h" #include "commands/extension.h" @@ -13,22 +15,20 @@ #include "utils/fmgroids.h" #include "utils/rel.h" -#if PG_VERSION_NUM == 110000 struct shm_mq_alt { slock_t mq_mutex; - PGPROC *mq_receiver; /* we need this one */ - PGPROC *mq_sender; /* this one */ - uint64 mq_bytes_read; - uint64 mq_bytes_written; + PGPROC *mq_receiver; + PGPROC *mq_sender; + pg_atomic_uint64 mq_bytes_read; + pg_atomic_uint64 mq_bytes_written; Size mq_ring_size; - bool mq_detached; /* and this one */ + bool mq_detached; + uint8 mq_ring_offset; + char mq_ring[FLEXIBLE_ARRAY_MEMBER]; /* in postgres version there are more attributes, but we don't need them */ }; -#else -#error "shm_mq struct in jsonbd is copied from PostgreSQL 11, please correct it according to your version" -#endif /** * Get 32-bit Murmur3 hash. Ported from qLibc library. @@ -153,7 +153,7 @@ get_jsonbd_schema(void) return InvalidOid; /* exit if pg_pathman does not exist */ ScanKeyInit(&entry[0], - ObjectIdAttributeNumber, + Anum_pg_extension_oid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(ext_oid)); diff --git a/jsonbd_worker.c b/jsonbd_worker.c index e9cb5b6..c59e102 100644 --- a/jsonbd_worker.c +++ b/jsonbd_worker.c @@ -13,11 +13,11 @@ #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/namespace.h" -#include "catalog/pg_compression_opt.h" #include "catalog/pg_extension.h" #include "commands/extension.h" #include "commands/dbcommands.h" #include "executor/spi.h" +#include "access/tableam.h" #include "port/atomics.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -32,7 +32,6 @@ #include "utils/rel.h" #include "utils/resowner.h" #include "utils/snapmgr.h" -#include "utils/tqual.h" #include "utils/syscache.h" static bool xact_started = false; @@ -125,7 +124,7 @@ init_worker(dsm_segment *seg) worker_args = (jsonbd_worker_args *) dsm_segment_address(seg); /* Connect to our database */ - BackgroundWorkerInitializeConnectionByOid(worker_args->dboid, InvalidOid); + BackgroundWorkerInitializeConnectionByOid(worker_args->dboid, InvalidOid, 0); worker_state = shm_toc_lookup(toc, worker_args->worker_num, false); worker_state->proc = MyProc; @@ -205,8 +204,8 @@ jsonbd_get_key(Relation rel, Relation indrel, Oid cmoptoid, uint32 key_id) IndexScanDesc scan; ScanKeyData skey[2]; Datum key_datum; - HeapTuple tup; bool isNull; + TupleTableSlot *slot; MemoryContext old_mcxt; char *res; @@ -224,19 +223,22 @@ jsonbd_get_key(Relation rel, Relation indrel, Oid cmoptoid, uint32 key_id) scan = index_beginscan(rel, indrel, SnapshotAny, 2, 0); index_rescan(scan, skey, 2, NULL, 0); - tup = index_getnext(scan, ForwardScanDirection); - if (tup == NULL) + slot = table_slot_create(rel, NULL); + + if (!index_getnext_slot(scan, ForwardScanDirection, slot)) elog(ERROR, "key not found for cmopt=%d and id=%d", cmoptoid, key_id); - key_datum = heap_getattr(tup, JSONBD_DICTIONARY_REL_ATT_KEY, - RelationGetDescr(rel), &isNull); + key_datum = slot_getattr(slot, JSONBD_DICTIONARY_REL_ATT_KEY, &isNull); Assert(!isNull); old_mcxt = MemoryContextSwitchTo(worker_context); res = TextDatumGetCString(key_datum); MemoryContextSwitchTo(old_mcxt); + ExecDropSingleTupleTableSlot(slot); + index_endscan(scan); + return res; } @@ -308,7 +310,7 @@ jsonbd_get_key_id(Relation rel, Relation indrel, Oid cmoptoid, char *key) { IndexScanDesc scan; ScanKeyData skey[2]; - HeapTuple tup; + TupleTableSlot *slot; bool isNull; uint32 result = 0; @@ -326,16 +328,18 @@ jsonbd_get_key_id(Relation rel, Relation indrel, Oid cmoptoid, char *key) scan = index_beginscan(rel, indrel, SnapshotAny, 2, 0); index_rescan(scan, skey, 2, NULL, 0); - tup = index_getnext(scan, ForwardScanDirection); - if (tup != NULL) + slot = table_slot_create(rel, NULL); + + if (index_getnext_slot(scan, ForwardScanDirection, slot)) { - Datum dat = heap_getattr(tup, JSONBD_DICTIONARY_REL_ATT_ID, - RelationGetDescr(rel), &isNull); + Datum dat = slot_getattr(slot, JSONBD_DICTIONARY_REL_ATT_ID, &isNull); Assert(!isNull); result = DatumGetInt32(dat); } index_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + return result; } @@ -547,7 +551,7 @@ jsonbd_launcher_main(Datum arg) BackgroundWorkerUnblockSignals(); /* Init this launcher as backend so workers can notify it */ - InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL); + InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false); /* Create resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "jsonbd_launcher_main"); @@ -821,10 +825,10 @@ jsonbd_register_worker(int worker_num, Oid dboid, int database_num) worker.bgw_start_time = BgWorkerStart_ConsistentState; worker.bgw_restart_time = 0; worker.bgw_notify_pid = MyProcPid; - memcpy(worker.bgw_library_name, "jsonbd", BGW_MAXLEN); - memcpy(worker.bgw_function_name, CppAsString(jsonbd_worker_main), BGW_MAXLEN); - snprintf(worker.bgw_name, BGW_MAXLEN, "jsonbd, worker %d, db: %d", - worker_num, dboid); + strcpy(worker.bgw_library_name, "jsonbd"); + strcpy(worker.bgw_function_name, "jsonbd_worker_main"); + strcpy(worker.bgw_name, "jsonbd worker"); + strcpy(worker.bgw_type, "jsonbd worker"); worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); /* Start dynamic worker */ @@ -835,11 +839,7 @@ jsonbd_register_worker(int worker_num, Oid dboid, int database_num) } /* Wait to be signalled. */ -#if PG_VERSION_NUM >= 100000 - WaitLatch(&hdr->launcher_latch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION); -#else - WaitLatch(&hdr->launcher_latch, WL_LATCH_SET, 0); -#endif + WaitLatch(&hdr->launcher_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, PG_WAIT_EXTENSION); ResetLatch(&hdr->launcher_latch); @@ -892,14 +892,14 @@ jsonbd_get_dictionary_relid(void) Assert(relid != InvalidOid); - rel = relation_open(relid, NoLock); + rel = relation_open(relid, AccessShareLock); indexes = RelationGetIndexList(rel); Assert(list_length(indexes) == 2); foreach(lc, indexes) { Oid indOid = lfirst_oid(lc); - Relation indRel = index_open(indOid, NoLock); + Relation indRel = index_open(indOid, AccessShareLock); int attnum = indRel->rd_index->indkey.values[1]; if (attnum == JSONBD_DICTIONARY_REL_ATT_ID) @@ -910,9 +910,9 @@ jsonbd_get_dictionary_relid(void) jsonbd_keys_indoid = indOid; } - index_close(indRel, NoLock); + index_close(indRel, AccessShareLock); } - relation_close(rel, NoLock); + relation_close(rel, AccessShareLock); } finish_xact_command();