Hello Ernest,
here is the latest version of that lzip-in/output-stream classes. I
fixed some issues since the last published version. These two classes,
in my humble opinion, are stable now. Any code reviews are welcomed!

Kenton, if there is a repository for external utility classes like
this one, please let me know. And if my other library gets into a
state, where I can publish it, I will create a project together with
its repository containing the above classes. However, since this is
work in progress with lots of changes to my API, I prefer to keep it
unpublished for now.

Regards, Jacob

-- 
You received this message because you are subscribed to the Google Groups 
"Protocol Buffers" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/protobuf?hl=en.

// This file contains the declaration of classes
// LzipInputStream and LzipOutputStream used to compress
// and decompress Google's Protocol Buffer Streams using
// the Lempel-Ziv-Markow-Algorithm.
//
// Derived from 
http://protobuf.googlecode.com/svn/tags/2.2.0/src/google/protobuf/io/gzip_stream.h
// Copyright 2010 by Jacob Rief <[email protected]>

#ifndef GOOGLE_PROTOBUF_IO_LZIP_STREAM_H__
#define GOOGLE_PROTOBUF_IO_LZIP_STREAM_H__

#include <stdint.h>
#include <lzlib.h>
#include <google/protobuf/io/zero_copy_stream.h>

namespace google {
namespace protobuf {
namespace io {

// A ZeroCopyInputStream that reads compressed data through lzib
class LIBPROTOBUF_EXPORT LzipInputStream : public ZeroCopyInputStream {
 public:
  explicit LzipInputStream(ZeroCopyInputStream* sub_stream);

  virtual ~LzipInputStream();

  // Releases the decoder.
  bool Close();

  // Reset the underlying input stream, resetting the decompressor and all 
counters.
  void Reset();

  // Forward the underlying InputStream to the begin of the next compression 
member.
  // Use this function after repositioning the underlying stream or in case a 
stream error occured.
  bool Forward();

  // In case of an error, check reason here
  inline LZ_Errno ErrorCode() const {
    return errno_;
  }

  // --- implements ZeroCopyInputStream ---
  bool Next(const void** data, int* size);
  void BackUp(int count);
  bool Skip(int count);
  int64 ByteCount() const;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(LzipInputStream);

  void Decompress();

  // compressed input stream
  ZeroCopyInputStream* sub_stream_;
  bool finished_;

  // plain text output stream
  const int output_buffer_length_;
  void* const output_buffer_;
  uint8_t* output_position_;
  uint8_t* next_out_;
  int avail_out_;

  // Lzip decoder
  LZ_Decoder* decoder_;
  LZ_Errno errno_;
};

class LIBPROTOBUF_EXPORT LzipOutputStream : public ZeroCopyOutputStream {
 public:
  // Create a LzipOutputStream with default options.
  explicit LzipOutputStream(ZeroCopyOutputStream* sub_stream, size_t 
compression_level = 5, int64_t member_size = kint64max);

  virtual ~LzipOutputStream();

  // Flushes data written so far to zipped data in the underlying stream.
  // It is the caller's responsibility to flush the underlying stream if
  // necessary.
  // Compression may be less efficient stopping and starting around flushes.
  // Returns true if no error.
  bool Flush();

  // Flushes data written so far to zipped data in the underlying stream
  // and restarts a new LZIP member. It is the caller's responsibility to
  // flush the underlying stream if necessary.
  // Compression is a lot more inefficient when restarting a new member,
  // rather than calling Flush().
  // Returns true if no error.
  bool Restart();

  // Writes out all data and closes the lzip stream.
  // It is the caller's responsibility to close the underlying stream if
  // necessary.
  // Returns true if no error.
  bool Close();

  // --- implements ZeroCopyOutputStream ---
  bool Next(void** data, int* size);
  void BackUp(int count);
  int64 ByteCount() const;
  void Reset();

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(LzipOutputStream);

  void Compress(bool flush = false);

  // plain text input stream
  const int input_buffer_length_;
  void* const input_buffer_;
  uint8_t* input_position_;
  uint8_t* const input_buffer_end_;

  // compressed output stream
  ZeroCopyOutputStream* sub_stream_;
  bool finished_;

  // Lzip encoder
  struct Options {
    int dictionary_size; // 4KiB..512MiB
    int match_len_limit; // 5..273
  };
  static const Options options[9];

  LZ_Encoder* encoder_;
  const uint64_t member_size_;
  LZ_Errno errno_;
};

}  // namespace io
}  // namespace protobuf
}  // namespace google

#endif  // GOOGLE_PROTOBUF_IO_LZIP_STREAM_H__
// This file contains the implementation of classes
// LzipInputStream and LzipOutputStream used to compress
// and decompress Google's Protocol Buffer Streams using
// the Lempel-Ziv-Markow-Algorithm.
//
// Derived from 
http://protobuf.googlecode.com/svn/tags/2.2.0/src/google/protobuf/io/gzip_stream.cc
// Copyright 2010 by Jacob Rief <[email protected]>

#include <google/protobuf/stubs/common.h>
#include "google/protobuf/io/lzip_stream.h"

namespace google {
namespace protobuf {
namespace io {

static const int kDefaultBufferSize = 8192;

// === LzipInputStream ===

LzipInputStream::LzipInputStream(ZeroCopyInputStream* sub_stream) :
  sub_stream_(sub_stream),
  finished_(false),
  output_buffer_length_(kDefaultBufferSize),
  output_buffer_(operator new(output_buffer_length_)),
  output_position_(NULL),
  next_out_(NULL),
  avail_out_(0),
  errno_(LZ_ok)
{
  GOOGLE_CHECK(output_buffer_ != NULL);
  decoder_ = LZ_decompress_open();
  errno_ = LZ_decompress_errno(decoder_);
}

LzipInputStream::~LzipInputStream() {
  if (decoder_ != NULL) {
    Close();
  }
  if (output_buffer_ != NULL) {
    operator delete(output_buffer_);
  }
}

bool LzipInputStream::Close() {
  errno_ = LZ_decompress_errno(decoder_);
  bool ok = LZ_decompress_close(decoder_) == LZ_ok;
  decoder_ = NULL;
  return ok;
}

bool LzipInputStream::Forward() {
  LZ_decompress_sync_to_member(decoder_);
  output_position_ = next_out_ = static_cast<uint8_t*>(output_buffer_);
  avail_out_ = output_buffer_length_;
  int avail_in, bytes_read = 0;
  const void* next_in;
  while (!finished_ && bytes_read == 0 && sub_stream_->Next(&next_in, 
&avail_in)) {
    int bytes_written = LZ_decompress_write(decoder_, static_cast<const 
uint8_t*>(next_in), avail_in);
    if (bytes_written > 0) {
      sub_stream_->BackUp(avail_in - bytes_written);
      bytes_read = LZ_decompress_read(decoder_, next_out_, avail_out_);
    } else {
      finished_ = true;
    }
  }
  errno_ = LZ_decompress_errno(decoder_);
  if (errno_ == LZ_ok) {
    if (bytes_read>0) {
      next_out_ += bytes_read;
      avail_out_ -= bytes_read;
      return true;
    } else {
      if (LZ_decompress_finish(decoder_) < 0)
        errno_ = LZ_decompress_errno(decoder_);
      finished_ = true;
    }
  }
  return false;
}

// --- implements ZeroCopyInputStream ---
bool LzipInputStream::Next(const void** data, int* size) {
  if (errno_ != LZ_ok)
    return false;
  GOOGLE_CHECK_GE(next_out_, output_position_);
  if (next_out_ == output_position_) {
    if (finished_ && LZ_decompress_finished(decoder_))
      return false;
    output_position_ = next_out_ = static_cast<uint8_t*>(output_buffer_);
    avail_out_ = output_buffer_length_;
    Decompress();
  }
  *data = output_position_;
  *size = next_out_ - output_position_;
  output_position_ = next_out_;
  errno_ = LZ_decompress_errno(decoder_);
  return errno_ == LZ_ok;
}

void LzipInputStream::BackUp(int count) {
  GOOGLE_CHECK_GE(output_position_-static_cast<uint8_t*>(output_buffer_), 
count);
  output_position_ -= count;
}

bool LzipInputStream::Skip(int count) {
  const void* data;
  int size;
  bool ok = Next(&data, &size);
  while (ok && (size < count)) {
    count -= size;
    ok = Next(&data, &size);
  }
  if (size > count) {
    BackUp(size - count);
  }
  return ok;
}

int64 LzipInputStream::ByteCount() const {
  return LZ_decompress_total_out_size(decoder_);
}

void LzipInputStream::Reset()
{
  finished_ = false;
  LZ_decompress_reset(decoder_);
  errno_ = LZ_decompress_errno(decoder_);
  output_position_ = NULL;
  next_out_ = NULL;
  avail_out_ = 0;
  sub_stream_->Reset();
}

// --- private ---
void LzipInputStream::Decompress() {
  GOOGLE_CHECK(errno_ == LZ_ok && avail_out_ > 0);
  if (!finished_) {
    int avail_in;
    const void* next_in;
    if (sub_stream_->Next(&next_in, &avail_in)) {
      int bytes_written = LZ_decompress_write(decoder_, static_cast<const 
uint8_t*>(next_in), avail_in);
      if (bytes_written < 0)
        return;
      sub_stream_->BackUp(avail_in - bytes_written);
    } else {
      if (LZ_decompress_finish(decoder_) < 0)
        return;
      finished_ = true;
    }
  }
  int bytes_read = LZ_decompress_read(decoder_, next_out_, avail_out_);
  if (bytes_read < 0)
    return;
  next_out_ += bytes_read;
  avail_out_ -= bytes_read;
}

// === LzipOutputStream ===

LzipOutputStream::LzipOutputStream(ZeroCopyOutputStream* sub_stream, size_t 
compression_level, int64_t member_size) :
  input_buffer_length_(kDefaultBufferSize),
  input_buffer_(operator new(input_buffer_length_)),
  input_position_(static_cast<uint8_t*>(input_buffer_)),
  input_buffer_end_(input_position_ + input_buffer_length_),
  sub_stream_(sub_stream),
  finished_(false),
  member_size_(member_size)
{
  GOOGLE_CHECK(input_buffer_ != NULL);
  GOOGLE_CHECK_GT(compression_level, 0);
  compression_level--;
  GOOGLE_CHECK_LT(compression_level, sizeof(options)/sizeof(Options));
  encoder_ = LZ_compress_open(options[compression_level].dictionary_size, 
options[compression_level].match_len_limit, member_size);
  errno_ = LZ_compress_errno(encoder_);
  GOOGLE_CHECK(errno_ == LZ_ok);
}

LzipOutputStream::~LzipOutputStream() {
  if (encoder_ != NULL) {
    Close();
  }
  if (input_buffer_ != NULL) {
    operator delete(input_buffer_);
  }
}

bool LzipOutputStream::Flush() {
  Compress(true);
  input_position_ = static_cast<uint8_t*>(input_buffer_);
  return true;
}

bool LzipOutputStream::Restart() {
  if (finished_)
    return false;
  Compress();
  GOOGLE_CHECK(LZ_compress_finish(encoder_) == LZ_ok);
  do {
    int avail_out;
    void* next_out;
    if (sub_stream_->Next(&next_out, &avail_out)) {
      int bytes_read = LZ_compress_read(encoder_, 
static_cast<uint8_t*>(next_out), avail_out);
      errno_ = LZ_compress_errno(encoder_);
      GOOGLE_CHECK(errno_ == LZ_ok);
      GOOGLE_CHECK_GE(bytes_read, 0);
      sub_stream_->BackUp(avail_out - bytes_read);
    } else {
      // disk full?
      return false;
    }
  } while (!LZ_compress_member_finished(encoder_));
  GOOGLE_CHECK(LZ_compress_restart_member(encoder_, member_size_) == LZ_ok);
  input_position_ = static_cast<uint8_t*>(input_buffer_);
  return true;
}

bool LzipOutputStream::Close() {
  if (finished_)
    return false;
  Compress();
  GOOGLE_CHECK(LZ_compress_finish(encoder_) == LZ_ok);
  do {
    int avail_out;
    void* next_out;
    if (sub_stream_->Next(&next_out, &avail_out)) {
      int bytes_read = LZ_compress_read(encoder_, 
static_cast<uint8_t*>(next_out), avail_out);
      errno_ = LZ_compress_errno(encoder_);
      GOOGLE_CHECK(errno_ == LZ_ok);
      GOOGLE_CHECK_GE(bytes_read, 0);
      sub_stream_->BackUp(avail_out - bytes_read);
    } else {
      // disk full?
      return false;
    }
  } while (!LZ_compress_finished(encoder_));
  GOOGLE_CHECK(LZ_compress_close(encoder_) == LZ_ok);
  encoder_ = NULL;
  return true;
}

// --- implements ZeroCopyOutputStream ---
bool LzipOutputStream::Next(void** data, int* size) {
  GOOGLE_CHECK_LE(input_position_, input_buffer_end_);
  if (input_position_ == input_buffer_end_) {
    if (finished_)
      return false;
    Compress();
    *data = input_buffer_;
    *size = input_buffer_length_;
  } else {
    *data = input_position_;
    *size = input_buffer_end_ - input_position_;
  }
  input_position_ = input_buffer_end_;
  return true;
}

void LzipOutputStream::BackUp(int count) {
  GOOGLE_CHECK_LE(input_buffer_length_ - count, input_position_ - 
static_cast<uint8_t*>(input_buffer_));
  input_position_ -= count;
}

int64 LzipOutputStream::ByteCount() const {
  return LZ_compress_total_in_size(encoder_);
}

void LzipOutputStream::Reset()
{
  finished_ = false;
  errno_ = LZ_ok;
  input_position_ = NULL;
  sub_stream_->Reset();
}

// --- private ---
void LzipOutputStream::Compress(bool flush) {
  uint8_t* next_in = static_cast<uint8_t*>(input_buffer_);
  int avail_in = input_position_ - next_in;
  int bytes_written, bytes_read;
  do {
    bytes_written = LZ_compress_write(encoder_, next_in, avail_in);
    errno_ = LZ_compress_errno(encoder_);
    GOOGLE_CHECK(errno_ == LZ_ok);
    GOOGLE_CHECK_GE(bytes_written, 0);
    next_in += bytes_written;
    avail_in -= bytes_written;
    if (flush) {
      GOOGLE_CHECK(LZ_compress_sync_flush(encoder_) == LZ_ok);
      flush = false;
    }
    int avail_out;
    void* next_out;
    if (sub_stream_->Next(&next_out, &avail_out)) {
      bytes_read = LZ_compress_read(encoder_, static_cast<uint8_t*>(next_out), 
avail_out);
      errno_ = LZ_compress_errno(encoder_);
      GOOGLE_CHECK(errno_ == LZ_ok);
      GOOGLE_CHECK_GE(bytes_read, 0);
      if (LZ_compress_member_finished(encoder_)==1) {
        LZ_compress_restart_member(encoder_, member_size_);
      }
      sub_stream_->BackUp(avail_out - bytes_read);
    } else {
      // disk full?
      finished_ = true;
    }
  } while (bytes_written>0 || bytes_read>0);
}

const LzipOutputStream::Options LzipOutputStream::options[9] = {
    { 1 << 20,  10 }, // -1
    { 1 << 20,  12 }, // -2
    { 1 << 20,  17 }, // -3
    { 1 << 21,  26 }, // -4
    { 1 << 22,  44 }, // -5
    { 1 << 23,  80 }, // -6
    { 1 << 24, 108 }, // -7
    { 1 << 24, 163 }, // -8
    { 1 << 25, 273 }  // -9
};

}  // namespace io
}  // namespace protobuf
}  // namespace google

Reply via email to