Changeset: 8c7be52d8968 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8c7be52d8968
Added Files:
        monetdb5/modules/mal/mosaic.c
        monetdb5/modules/mal/mosaic.h
        monetdb5/modules/mal/mosaic.mal
        monetdb5/modules/mal/mosaic_none.c
        monetdb5/modules/mal/mosaic_rle.c
Modified Files:
        monetdb5/modules/mal/mal_init.mal
Branch: mosaic
Log Message:

Framework for chunk-based compressions
Initial phase in compression for heaps based on none/runlengthencoding


diffs (truncated from 1414 to 300 lines):

diff --git a/monetdb5/modules/mal/mal_init.mal 
b/monetdb5/modules/mal/mal_init.mal
--- a/monetdb5/modules/mal/mal_init.mal
+++ b/monetdb5/modules/mal/mal_init.mal
@@ -103,6 +103,7 @@ include txtsim;
 include tokenizer;
 include zorder;
 
+include mosaic;
 # include logger;
 include transaction;
 
diff --git a/monetdb5/modules/mal/mosaic.c b/monetdb5/modules/mal/mosaic.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/mal/mosaic.c
@@ -0,0 +1,815 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.monetdb.org/Legal/MonetDBLicense
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2014 MonetDB B.V.
+ * All Rights Reserved.
+ */
+
+/*
+ * (c) 2014 author Martin Kersten
+ * Adaptive compression scheme to reduce the storage footprint for stable 
persistent data.
+*/
+
+#include "monetdb_config.h"
+#include "mosaic.h"
+#include "mtime.h"
+#include "math.h"
+#include "opt_prelude.h"
+#include "algebra.h"
+
+//#define _DEBUG_MOSAIC_
+
+/* do not invest in compressing BATs smaller than this */
+#define MIN_INPUT_COUNT 1
+
+/* The compressor kinds currently hardwired */
+#define MOSAIC_METHODS 7
+#define MOSAIC_NONE     0              // no compression at all
+#define MOSAIC_RLE      1              // use run-length encoding
+#define MOSAIC_FRONT    2              // use front compression for >=4 byte 
fields
+#define MOSAIC_DELTATA    3            // use delta encoding
+#define MOSAIC_BITMAPS  4              // use limited set of bitmaps
+#define MOSAIC_RANGE    5              // use linear model 
+#define MOSAIC_GUASSIAN 6              // use guassian model fitting
+
+#define MOSAIC_BITS 48                 // maximum number of elements to 
compress
+
+//Compression should have a significant reduction to apply.
+#define COMPRESS_THRESHOLD 50   //percent
+
+/*
+ * The header is reserved for meta information, e.g. oid indices.
+ * The block header encodes the information needed for the chunk decompressor
+ */
+typedef struct MOSAICHEADER{
+       int mosaicversion;
+       oid index[1000];
+       lng offset[1000];
+} * MosaicHeader;
+
+typedef struct MOSAICBLOCK{
+       lng tag:4,              // method applied in chunk
+       cnt:MOSAIC_BITS;        // compression specific information
+} *MosaicBlk; 
+
+#define MosaicHdrSize  sizeof(struct MOSAICHEADER)
+#define MosaicBlkSize  sizeof(struct MOSAICBLOCK)
+
+#define wordaligned(X,SZ) \
+       X = ((char*)X) + (SZ) +  ((SZ) % sizeof(int)? sizeof(int) - 
(SZ)%sizeof(int) : 0)
+
+
+typedef struct MOSTASK{
+       int type;               // one of the permissible types
+       BUN     elm;            // elements left to compress
+       char *srcheap;  // start in source heap
+       char *dstheap;  // start of the destination heap
+       char *src, *compressed;// read pointer into source, write pointer into 
destination
+       MosaicBlk hdr;  // current block header
+
+       // The competing compression scheme leave the number of elements and 
compressed size
+       lng elements[MOSAIC_METHODS];   
+       lng xsize[MOSAIC_METHODS];              
+       lng time[MOSAIC_METHODS];               
+       // collect compression statistics for the particular task
+       lng timing[MOSAIC_METHODS];
+       lng winners[MOSAIC_METHODS];    
+       int percentage[MOSAIC_METHODS]; // compression size for the last batch 
0..100 percent
+} *MOStask;
+
+/* we keep a condensed OID index anchored to the compressed blocks */
+
+typedef struct MOSINDEX{
+       lng offset;             // header location within compressed heap
+       lng nullcnt;    // number of nulls encountered
+       ValRecord low,hgh; // zone value markers for fix-length types
+} *mosaicindex;
+
+/* Run through a column to produce a compressed version */
+
+/* simple include the details of the hardwired compressors */
+#include "mosaic_none.c"
+#include "mosaic_rle.c"
+
+#ifdef _DEBUG_MOSAIC_
+static void
+MOSdumpTask(Client cntxt,MOStask task)
+{
+       int i;
+       mnstr_printf(cntxt->fdout,"#type %d todo "LLFMT"\n", task->type, 
(lng)task->elm);
+       mnstr_printf(cntxt->fdout,"#winners ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout,LLFMT " ",task->winners[i]);
+       mnstr_printf(cntxt->fdout,"\n#elements ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout,LLFMT " ",task->elements[i]);
+       mnstr_printf(cntxt->fdout,"\n#xsize ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout,LLFMT " ",task->xsize[i]);
+       mnstr_printf(cntxt->fdout,"\n#time ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout, LLFMT" ",task->time[i]);
+       mnstr_printf(cntxt->fdout,"\n#percentage ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout, "%d ",task->percentage[i]);
+       mnstr_printf(cntxt->fdout,"\n#timing ");
+       for(i=0; i< MOSAIC_METHODS; i++)
+               mnstr_printf(cntxt->fdout, LLFMT" ",task->timing[i]);
+       mnstr_printf(cntxt->fdout,"\n");
+}
+#endif
+
+static void
+MOSdumpInternal(Client cntxt, BAT *b){
+       MOStask task=0;
+       // loop thru the chunks
+       MT_lock_set(&mal_profileLock,"mosaicdump");
+       task= (MOStask) GDKzalloc(sizeof(*task));
+       task->type = b->ttype;
+       task->elm =  b->T->heap.count;
+       task->compressed = task->srcheap = (void*) Tloc(b, BUNfirst(b));
+       task->compressed += MosaicHdrSize;
+       task->hdr = (MosaicBlk) task->compressed;
+       while(task->elm  >0){
+               switch(task->hdr->tag){
+               case MOSAIC_NONE: MOSdump_none(cntxt,task); break;
+               case MOSAIC_RLE: MOSdump_rle(cntxt,task); break;
+               default: assert(0);
+               }
+       }
+       MT_lock_unset(&mal_profileLock,"mosaicdump");
+}
+str
+MOSdump(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      
+       int bid = *(int*) getArgReference(stk,pci,1);
+       BAT *b;
+       str msg = MAL_SUCCEED;
+
+       (void) mb;
+
+       if  ((b = BATdescriptor(bid)) == NULL)
+               throw(MAL,"mosaic.dump",INTERNAL_BAT_ACCESS);
+       if ( !b->T->heap.compressed){
+               BBPreleaseref(bid);
+               return MAL_SUCCEED;
+       }
+       MOSdumpInternal(cntxt,b);
+       BBPreleaseref(bid);
+       return msg;
+}
+
+static BAT*
+inheritCOL( BAT *bn, COLrec *cn, BAT *b, COLrec *c, bat p )
+{
+       /* inherit column as view */
+       str nme = cn->id;
+
+       assert((b->H == c && p == VIEWhparent(b)) ||
+              (b->T == c && p == VIEWtparent(b)));
+       assert(bn->H == cn || bn->T == cn);
+       assert(cn->props == NULL);
+       assert(cn->vheap == NULL);
+       assert(cn->hash  == NULL);
+       assert(bn->S->deleted  == b->S->deleted );
+       assert(bn->S->first    == b->S->first   );
+       assert(bn->S->inserted == b->S->inserted);
+       assert(bn->S->count    == b->S->count   );
+
+       HEAPfree(&cn->heap);
+
+       if (p == 0)
+               p = b->batCacheid;
+       bn->S->capacity = MIN( bn->S->capacity, b->S->capacity );
+       *cn = *c;
+       BBPshare(p);
+       if (cn->vheap) {
+               assert(cn->vheap->parentid > 0);
+               BBPshare(cn->vheap->parentid);
+       }
+       cn->heap.copied = 0;
+       cn->props = NULL;
+       cn->heap.parentid = p;
+       cn->id = nme;
+       if (isVIEW(b))
+               cn->hash = NULL;
+       else
+               cn->hash = c->hash;
+
+       return bn;
+}
+
+/*
+ * Compression is focussed on a single column.
+ * Multiple compression techniques are applied at the same time.
+ */
+
+static void
+MOSinit(MOStask task){
+       if ( task->elm > 0){
+               task->hdr = (MosaicBlk) task->compressed;
+               task->hdr->tag = MOSAIC_NONE;
+               task->hdr->cnt = 0;
+               wordaligned(task->compressed,MosaicBlkSize);
+               //task->compressed += MosaicBlkSize; // beware of byte alignment
+       }
+}
+
+static void
+MOSclose(MOStask task){
+       if( task->hdr->cnt == 0){
+               wordaligned(task->compressed,-MosaicBlkSize);
+               //task->compressed -= MosaicBlkSize; // beware of byte alignment
+               return; 
+       }
+}
+
+str
+MOScompressInternal(Client cntxt, int *ret, int *bid, int threshold)
+{
+       BAT *b, *bn;
+       BUN cnt;
+       str msg = MAL_SUCCEED;
+       MOStask task;
+       int cand;
+       lng chunksize=0, ch;
+
+       if( threshold == 0)
+               threshold = COMPRESS_THRESHOLD;
+       if ((b = BATdescriptor(*bid)) == NULL)
+               throw(MAL, "mosaic.compress", INTERNAL_BAT_ACCESS);
+
+       if ( b->ttype == TYPE_void){
+               // void columns are already compressed
+               BBPreleaseref(*ret = b->batCacheid);
+               return msg;
+       }
+
+       if (b->T->heap.compressed) {
+               BBPreleaseref(b->batCacheid);
+               return msg;     // don't compress twice
+       }
+
+       cnt = BATcount(b);
+       if ( isVIEWCOMBINE(b) || cnt < MIN_INPUT_COUNT ){
+               /* no need to compress */
+               BBPkeepref(*ret = b->batCacheid);
+               return msg;
+       }
+
+#ifdef _DEBUG_MOSAIC_
+       mnstr_printf(cntxt->fdout,"#compress bat %d threshold 
%d\n",*bid,threshold);
+#endif
+       // allocate space for the compressed version.
+       // It should always take less space then the orginal column.
+       // But be prepared that a last block header may  be stored
+       // use a small size overshoot
+       bn = BATnew( TYPE_void, b->ttype, cnt+MosaicBlkSize, TRANSIENT);
+       if (bn == NULL) {
+               BBPreleaseref(b->batCacheid);
+               throw(MAL,"mosaic.compress", MAL_MALLOC_FAIL);
+       }
+
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to