Changeset: f0a793843277 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f0a793843277 Modified Files: common/stream/stream.c Branch: default Log Message:
Added support for lz4 compressed files streams diffs (truncated from 432 to 300 lines): diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -93,6 +93,7 @@ #endif #ifdef HAVE_LIBLZ4 #include <lz4.h> +#include <lz4frame.h> #endif #ifdef HAVE_ICONV @@ -1700,6 +1701,384 @@ open_xzwastream(const char *restrict fil #endif /* ------------------------------------------------------------------ */ +/* streams working on a lz4-compressed disk file */ + +#ifdef HAVE_LIBLZ4 +#define LZ4DECOMPBUFSIZ 128*1024 +typedef struct lz4_stream { + FILE *fp; + size_t total_processing; + size_t ring_buffer_size; + void* ring_buffer; + union { + LZ4F_compressionContext_t comp_context; + LZ4F_decompressionContext_t dec_context; + } context; +} lz4_stream; + +static ssize_t +stream_lz4read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) +{ + lz4_stream *lz4 = s->stream_data.p; + size_t size = elmsize * cnt, total_read = 0, total_decompressed, ret, remaining_to_decompress; + + if (lz4 == NULL || size <= 0) { + s->errnr = MNSTR_READ_ERROR; + return -1; + } + + while (total_read < size) { + if (lz4->total_processing == lz4->ring_buffer_size) { + if(feof(lz4->fp)) { + break; + } else { + lz4->ring_buffer_size = fread(lz4->ring_buffer, 1, LZ4_COMPRESSBOUND(LZ4DECOMPBUFSIZ), lz4->fp); + if (lz4->ring_buffer_size == 0 || ferror(lz4->fp)) { + s->errnr = MNSTR_READ_ERROR; + return -1; + } + lz4->total_processing = 0; + } + } + + remaining_to_decompress = size - total_read; + total_decompressed = lz4->ring_buffer_size - lz4->total_processing; + ret = LZ4F_decompress(lz4->context.dec_context, (char*)buf + total_read, &remaining_to_decompress, + (char*)lz4->ring_buffer + lz4->total_processing, &total_decompressed, NULL); + if(LZ4F_isError(ret)) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + + lz4->total_processing += total_decompressed; + total_read += remaining_to_decompress; + } + +#ifdef WIN32 + /* on Windows when in text mode, convert \r\n line endings to \n */ + if (s->type == ST_ASCII) { + char *p1, *p2, *pe; + + p1 = buf; + pe = p1 + total_read; + while (p1 < pe && *p1 != '\r') + p1++; + p2 = p1; + while (p1 < pe) { + if (*p1 == '\r' && p1[1] == '\n') + total_read--; + else + *p2++ = *p1; + p1++; + } + } +#endif + return (ssize_t) (total_read / elmsize); +} + +static ssize_t +stream_lz4write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) +{ + lz4_stream *lz4 = s->stream_data.p; + size_t ret, size = elmsize * cnt, total_written = 0, next_batch, next_attempt, available, real_written; + + if (lz4 == NULL || size > LZ4_MAX_INPUT_SIZE || size <= 0) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + + while (total_written < size) { + next_batch = size - total_written; + available = lz4->ring_buffer_size - lz4->total_processing; + do { + next_attempt = LZ4F_compressBound(next_batch, NULL); /* lz4->ring_buffer must be at least 65548 bytes */ + if(next_attempt > available) { + next_batch >>= 1; + } else { + break; + } + if(next_batch == 0) + break; + } while(1); + assert(next_batch > 0); + + ret = LZ4F_compressUpdate(lz4->context.comp_context, ((char*)lz4->ring_buffer) + lz4->total_processing, + available, ((char*)buf) + total_written, next_batch, NULL); + if(LZ4F_isError(ret)) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } else { + lz4->total_processing += ret; + } + + if(lz4->total_processing == lz4->ring_buffer_size) { + real_written = fwrite((void *)lz4->ring_buffer, 1, lz4->total_processing, lz4->fp); + if (real_written == 0) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + lz4->total_processing = 0; + } + total_written += next_batch; + } + + return (ssize_t) (total_written / elmsize); +} + +static void +stream_lz4close(stream *s) +{ + lz4_stream *lz4 = s->stream_data.p; + + if (lz4) { + if (s->access == ST_WRITE) { + size_t ret, real_written; + + if (lz4->total_processing > 0 && lz4->total_processing < lz4->ring_buffer_size) { /* compress remaining */ + real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp); + if (real_written == 0) { + s->errnr = MNSTR_WRITE_ERROR; + return ; + } + lz4->total_processing = 0; + } /* finish compression */ + ret = LZ4F_compressEnd(lz4->context.comp_context, lz4->ring_buffer, lz4->ring_buffer_size, NULL); + if(LZ4F_isError(ret)) { + s->errnr = MNSTR_WRITE_ERROR; + return ; + } + assert(ret < LZ4DECOMPBUFSIZ); + lz4->total_processing = ret; + + real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp); + if (real_written == 0) { + s->errnr = MNSTR_WRITE_ERROR; + return ; + } + lz4->total_processing = 0; + + fflush(lz4->fp); + } + if(s->access == ST_WRITE) { + (void) LZ4F_freeCompressionContext(lz4->context.comp_context); + } else { + (void) LZ4F_freeDecompressionContext(lz4->context.dec_context); + } + fclose(lz4->fp); + free(lz4->ring_buffer); + free(lz4); + } + s->stream_data.p = NULL; +} + +static int +stream_lz4flush(stream *s) +{ + lz4_stream *lz4 = s->stream_data.p; + size_t real_written, ret; + + if (lz4 == NULL) + return -1; + if (s->access == ST_WRITE) { + if (lz4->total_processing > 0 && lz4->total_processing < lz4->ring_buffer_size) { /* compress remaining */ + real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp); + if (real_written == 0) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + lz4->total_processing = 0; + } + ret = LZ4F_flush(lz4->context.comp_context, lz4->ring_buffer, lz4->ring_buffer_size, NULL); /* flush it */ + if(LZ4F_isError(ret)) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + lz4->total_processing = ret; + real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp); + if (real_written == 0) { + s->errnr = MNSTR_WRITE_ERROR; + return -1; + } + lz4->total_processing = 0; + + if(fflush(lz4->fp)) + return -1; + } + return 0; +} + +static stream * +open_lz4stream(const char *restrict filename, const char *restrict flags) +{ + stream *s; + lz4_stream *lz4; + LZ4F_errorCode_t error_code; + char fl[3]; + size_t buffer_size = (flags[0] == 'r') ? LZ4_COMPRESSBOUND(LZ4DECOMPBUFSIZ) : LZ4DECOMPBUFSIZ; + + if ((lz4 = malloc(sizeof(struct lz4_stream))) == NULL) + return NULL; + if ((lz4->ring_buffer = malloc(buffer_size)) == NULL) { + free(lz4); + return NULL; + } + lz4->total_processing = (flags[0] == 'r') ? buffer_size : 0; + lz4->ring_buffer_size = buffer_size; + + if(flags[0] == 'w') { + error_code = LZ4F_createCompressionContext(&(lz4->context.comp_context), LZ4F_VERSION); + } else { + error_code = LZ4F_createDecompressionContext(&(lz4->context.dec_context), LZ4F_VERSION); + } + if(LZ4F_isError(error_code)) { + free(lz4->ring_buffer); + free(lz4); + return NULL; + } + + if ((s = create_stream(filename)) == NULL) { + if(flags[0] == 'w') { + (void) LZ4F_freeCompressionContext(lz4->context.comp_context); + } else { + (void) LZ4F_freeDecompressionContext(lz4->context.dec_context); + } + free(lz4->ring_buffer); + free(lz4); + return NULL; + } + fl[0] = flags[0]; /* 'r' or 'w' */ + fl[1] = 'b'; /* always binary */ + fl[2] = '\0'; +#ifdef HAVE__WFOPEN + { + wchar_t *wfname = utf8towchar(filename); + wchar_t *wflags = utf8towchar(fl); + if (wfname != NULL) + lz4->fp = _wfopen(wfname, wflags); + else + lz4->fp = NULL; + if (wfname) + free(wfname); + if (wflags) + free(wflags); + } +#else + { + char *fname = cvfilename(filename); + if (fname) { + lz4->fp = fopen(fname, fl); + free(fname); + } else + lz4->fp = NULL; + } +#endif + if (lz4->fp == NULL) { + destroy(s); + if(flags[0] == 'w') { + (void) LZ4F_freeCompressionContext(lz4->context.comp_context); + } else { + (void) LZ4F_freeDecompressionContext(lz4->context.dec_context); + } + free(lz4->ring_buffer); + free(lz4); + return NULL; + } + s->read = stream_lz4read; + s->write = stream_lz4write; + s->close = stream_lz4close; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list