Changeset: 8c7be52d8968 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8c7be52d8968 Added Files: monetdb5/modules/mal/mosaic.c monetdb5/modules/mal/mosaic.h monetdb5/modules/mal/mosaic.mal monetdb5/modules/mal/mosaic_none.c monetdb5/modules/mal/mosaic_rle.c Modified Files: monetdb5/modules/mal/mal_init.mal Branch: mosaic Log Message:
Framework for chunk-based compressions Initial phase in compression for heaps based on none/runlengthencoding diffs (truncated from 1414 to 300 lines): diff --git a/monetdb5/modules/mal/mal_init.mal b/monetdb5/modules/mal/mal_init.mal --- a/monetdb5/modules/mal/mal_init.mal +++ b/monetdb5/modules/mal/mal_init.mal @@ -103,6 +103,7 @@ include txtsim; include tokenizer; include zorder; +include mosaic; # include logger; include transaction; diff --git a/monetdb5/modules/mal/mosaic.c b/monetdb5/modules/mal/mosaic.c new file mode 100644 --- /dev/null +++ b/monetdb5/modules/mal/mosaic.c @@ -0,0 +1,815 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.monetdb.org/Legal/MonetDBLicense + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2014 MonetDB B.V. + * All Rights Reserved. + */ + +/* + * (c) 2014 author Martin Kersten + * Adaptive compression scheme to reduce the storage footprint for stable persistent data. +*/ + +#include "monetdb_config.h" +#include "mosaic.h" +#include "mtime.h" +#include "math.h" +#include "opt_prelude.h" +#include "algebra.h" + +//#define _DEBUG_MOSAIC_ + +/* do not invest in compressing BATs smaller than this */ +#define MIN_INPUT_COUNT 1 + +/* The compressor kinds currently hardwired */ +#define MOSAIC_METHODS 7 +#define MOSAIC_NONE 0 // no compression at all +#define MOSAIC_RLE 1 // use run-length encoding +#define MOSAIC_FRONT 2 // use front compression for >=4 byte fields +#define MOSAIC_DELTATA 3 // use delta encoding +#define MOSAIC_BITMAPS 4 // use limited set of bitmaps +#define MOSAIC_RANGE 5 // use linear model +#define MOSAIC_GUASSIAN 6 // use guassian model fitting + +#define MOSAIC_BITS 48 // maximum number of elements to compress + +//Compression should have a significant reduction to apply. +#define COMPRESS_THRESHOLD 50 //percent + +/* + * The header is reserved for meta information, e.g. oid indices. + * The block header encodes the information needed for the chunk decompressor + */ +typedef struct MOSAICHEADER{ + int mosaicversion; + oid index[1000]; + lng offset[1000]; +} * MosaicHeader; + +typedef struct MOSAICBLOCK{ + lng tag:4, // method applied in chunk + cnt:MOSAIC_BITS; // compression specific information +} *MosaicBlk; + +#define MosaicHdrSize sizeof(struct MOSAICHEADER) +#define MosaicBlkSize sizeof(struct MOSAICBLOCK) + +#define wordaligned(X,SZ) \ + X = ((char*)X) + (SZ) + ((SZ) % sizeof(int)? sizeof(int) - (SZ)%sizeof(int) : 0) + + +typedef struct MOSTASK{ + int type; // one of the permissible types + BUN elm; // elements left to compress + char *srcheap; // start in source heap + char *dstheap; // start of the destination heap + char *src, *compressed;// read pointer into source, write pointer into destination + MosaicBlk hdr; // current block header + + // The competing compression scheme leave the number of elements and compressed size + lng elements[MOSAIC_METHODS]; + lng xsize[MOSAIC_METHODS]; + lng time[MOSAIC_METHODS]; + // collect compression statistics for the particular task + lng timing[MOSAIC_METHODS]; + lng winners[MOSAIC_METHODS]; + int percentage[MOSAIC_METHODS]; // compression size for the last batch 0..100 percent +} *MOStask; + +/* we keep a condensed OID index anchored to the compressed blocks */ + +typedef struct MOSINDEX{ + lng offset; // header location within compressed heap + lng nullcnt; // number of nulls encountered + ValRecord low,hgh; // zone value markers for fix-length types +} *mosaicindex; + +/* Run through a column to produce a compressed version */ + +/* simple include the details of the hardwired compressors */ +#include "mosaic_none.c" +#include "mosaic_rle.c" + +#ifdef _DEBUG_MOSAIC_ +static void +MOSdumpTask(Client cntxt,MOStask task) +{ + int i; + mnstr_printf(cntxt->fdout,"#type %d todo "LLFMT"\n", task->type, (lng)task->elm); + mnstr_printf(cntxt->fdout,"#winners "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout,LLFMT " ",task->winners[i]); + mnstr_printf(cntxt->fdout,"\n#elements "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout,LLFMT " ",task->elements[i]); + mnstr_printf(cntxt->fdout,"\n#xsize "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout,LLFMT " ",task->xsize[i]); + mnstr_printf(cntxt->fdout,"\n#time "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout, LLFMT" ",task->time[i]); + mnstr_printf(cntxt->fdout,"\n#percentage "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout, "%d ",task->percentage[i]); + mnstr_printf(cntxt->fdout,"\n#timing "); + for(i=0; i< MOSAIC_METHODS; i++) + mnstr_printf(cntxt->fdout, LLFMT" ",task->timing[i]); + mnstr_printf(cntxt->fdout,"\n"); +} +#endif + +static void +MOSdumpInternal(Client cntxt, BAT *b){ + MOStask task=0; + // loop thru the chunks + MT_lock_set(&mal_profileLock,"mosaicdump"); + task= (MOStask) GDKzalloc(sizeof(*task)); + task->type = b->ttype; + task->elm = b->T->heap.count; + task->compressed = task->srcheap = (void*) Tloc(b, BUNfirst(b)); + task->compressed += MosaicHdrSize; + task->hdr = (MosaicBlk) task->compressed; + while(task->elm >0){ + switch(task->hdr->tag){ + case MOSAIC_NONE: MOSdump_none(cntxt,task); break; + case MOSAIC_RLE: MOSdump_rle(cntxt,task); break; + default: assert(0); + } + } + MT_lock_unset(&mal_profileLock,"mosaicdump"); +} +str +MOSdump(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int bid = *(int*) getArgReference(stk,pci,1); + BAT *b; + str msg = MAL_SUCCEED; + + (void) mb; + + if ((b = BATdescriptor(bid)) == NULL) + throw(MAL,"mosaic.dump",INTERNAL_BAT_ACCESS); + if ( !b->T->heap.compressed){ + BBPreleaseref(bid); + return MAL_SUCCEED; + } + MOSdumpInternal(cntxt,b); + BBPreleaseref(bid); + return msg; +} + +static BAT* +inheritCOL( BAT *bn, COLrec *cn, BAT *b, COLrec *c, bat p ) +{ + /* inherit column as view */ + str nme = cn->id; + + assert((b->H == c && p == VIEWhparent(b)) || + (b->T == c && p == VIEWtparent(b))); + assert(bn->H == cn || bn->T == cn); + assert(cn->props == NULL); + assert(cn->vheap == NULL); + assert(cn->hash == NULL); + assert(bn->S->deleted == b->S->deleted ); + assert(bn->S->first == b->S->first ); + assert(bn->S->inserted == b->S->inserted); + assert(bn->S->count == b->S->count ); + + HEAPfree(&cn->heap); + + if (p == 0) + p = b->batCacheid; + bn->S->capacity = MIN( bn->S->capacity, b->S->capacity ); + *cn = *c; + BBPshare(p); + if (cn->vheap) { + assert(cn->vheap->parentid > 0); + BBPshare(cn->vheap->parentid); + } + cn->heap.copied = 0; + cn->props = NULL; + cn->heap.parentid = p; + cn->id = nme; + if (isVIEW(b)) + cn->hash = NULL; + else + cn->hash = c->hash; + + return bn; +} + +/* + * Compression is focussed on a single column. + * Multiple compression techniques are applied at the same time. + */ + +static void +MOSinit(MOStask task){ + if ( task->elm > 0){ + task->hdr = (MosaicBlk) task->compressed; + task->hdr->tag = MOSAIC_NONE; + task->hdr->cnt = 0; + wordaligned(task->compressed,MosaicBlkSize); + //task->compressed += MosaicBlkSize; // beware of byte alignment + } +} + +static void +MOSclose(MOStask task){ + if( task->hdr->cnt == 0){ + wordaligned(task->compressed,-MosaicBlkSize); + //task->compressed -= MosaicBlkSize; // beware of byte alignment + return; + } +} + +str +MOScompressInternal(Client cntxt, int *ret, int *bid, int threshold) +{ + BAT *b, *bn; + BUN cnt; + str msg = MAL_SUCCEED; + MOStask task; + int cand; + lng chunksize=0, ch; + + if( threshold == 0) + threshold = COMPRESS_THRESHOLD; + if ((b = BATdescriptor(*bid)) == NULL) + throw(MAL, "mosaic.compress", INTERNAL_BAT_ACCESS); + + if ( b->ttype == TYPE_void){ + // void columns are already compressed + BBPreleaseref(*ret = b->batCacheid); + return msg; + } + + if (b->T->heap.compressed) { + BBPreleaseref(b->batCacheid); + return msg; // don't compress twice + } + + cnt = BATcount(b); + if ( isVIEWCOMBINE(b) || cnt < MIN_INPUT_COUNT ){ + /* no need to compress */ + BBPkeepref(*ret = b->batCacheid); + return msg; + } + +#ifdef _DEBUG_MOSAIC_ + mnstr_printf(cntxt->fdout,"#compress bat %d threshold %d\n",*bid,threshold); +#endif + // allocate space for the compressed version. + // It should always take less space then the orginal column. + // But be prepared that a last block header may be stored + // use a small size overshoot + bn = BATnew( TYPE_void, b->ttype, cnt+MosaicBlkSize, TRANSIENT); + if (bn == NULL) { + BBPreleaseref(b->batCacheid); + throw(MAL,"mosaic.compress", MAL_MALLOC_FAIL); + } + _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list