Author: cutting
Date: Tue Jan 7 21:53:54 2014
New Revision: 1556373
URL: http://svn.apache.org/r1556373
Log:
AVRO-1414. C++: Add support for deflate-compressed data files. Contributed by
Daniel Russel.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/c++/CMakeLists.txt
avro/trunk/lang/c++/api/DataFile.hh
avro/trunk/lang/c++/impl/DataFile.cc
avro/trunk/lang/c++/test/DataFileTests.cc
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jan 7 21:53:54 2014
@@ -27,6 +27,9 @@ Trunk (not yet released)
AVRO-1379. C: avro_file_writer_append_encoded() function.
(Mark Teodoro via dcreager)
+ AVRO-1414. C++: Add support for deflate-compressed data files.
+ (Daniel Russel via cutting)
+
OPTIMIZATIONS
AVRO-1348. Java: Improve UTF-8 to String conversion performance in
Modified: avro/trunk/lang/c++/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/CMakeLists.txt?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/CMakeLists.txt (original)
+++ avro/trunk/lang/c++/CMakeLists.txt Tue Jan 7 21:53:54 2014
@@ -52,7 +52,7 @@ endif ()
find_package (Boost 1.38 REQUIRED
- COMPONENTS filesystem system program_options)
+ COMPONENTS filesystem system program_options iostreams)
add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
Modified: avro/trunk/lang/c++/api/DataFile.hh
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/DataFile.hh?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/api/DataFile.hh (original)
+++ avro/trunk/lang/c++/api/DataFile.hh Tue Jan 7 21:53:54 2014
@@ -32,9 +32,17 @@
#include "boost/array.hpp"
#include "boost/utility.hpp"
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/scoped_ptr.hpp>
namespace avro {
+/** Specify type of compression to use when writing data files. */
+enum Codec {
+ NULL_CODEC,
+ DEFLATE_CODEC
+};
+
/**
* The sync value.
*/
@@ -50,6 +58,7 @@ class AVRO_DECL DataFileWriterBase : boo
const ValidSchema schema_;
const EncoderPtr encoderPtr_;
const size_t syncInterval_;
+ Codec codec_;
std::auto_ptr<OutputStream> stream_;
std::auto_ptr<OutputStream> buffer_;
@@ -76,7 +85,7 @@ public:
* Returns the current encoder for this writer.
*/
Encoder& encoder() const { return *encoderPtr_; }
-
+
/**
* Returns true if the buffer has sufficient data for a sync to be
* inserted.
@@ -93,7 +102,7 @@ public:
* Constructs a data file writer with the given sync interval and name.
*/
DataFileWriterBase(const char* filename, const ValidSchema& schema,
- size_t syncInterval);
+ size_t syncInterval, Codec codec = NULL_CODEC);
~DataFileWriterBase();
/**
@@ -124,8 +133,8 @@ public:
* Constructs a new data file.
*/
DataFileWriter(const char* filename, const ValidSchema& schema,
- size_t syncInterval = 64 * 1024) :
- base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
+ size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
+ base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {
}
/**
* Writes the given piece of data into the file.
@@ -162,6 +171,7 @@ class AVRO_DECL DataFileReaderBase : boo
const DecoderPtr decoder_;
int64_t objectCount_;
bool eof_;
+ Codec codec_;
ValidSchema readerSchema_;
ValidSchema dataSchema_;
@@ -172,6 +182,10 @@ class AVRO_DECL DataFileReaderBase : boo
Metadata metadata_;
DataFileSync sync_;
+ // for compressed buffer
+ boost::scoped_ptr<boost::iostreams::filtering_istream> os_;
+ std::vector<char> compressed_;
+
void readHeader();
bool readDataBlock();
@@ -315,5 +329,3 @@ public:
} // namespace avro
#endif
-
-
Modified: avro/trunk/lang/c++/impl/DataFile.cc
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/DataFile.cc?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/DataFile.cc (original)
+++ avro/trunk/lang/c++/impl/DataFile.cc Tue Jan 7 21:53:54 2014
@@ -23,6 +23,9 @@
#include <sstream>
#include <boost/random/mersenne_twister.hpp>
+#include <boost/iostreams/device/file.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
namespace avro {
using std::auto_ptr;
@@ -34,14 +37,25 @@ using std::string;
using boost::array;
+namespace {
const string AVRO_SCHEMA_KEY("avro.schema");
const string AVRO_CODEC_KEY("avro.codec");
const string AVRO_NULL_CODEC("null");
+const string AVRO_DEFLATE_CODEC("deflate");
const size_t minSyncInterval = 32;
const size_t maxSyncInterval = 1u << 30;
const size_t defaultSyncInterval = 64 * 1024;
+boost::iostreams::zlib_params get_zlib_params() {
+ boost::iostreams::zlib_params ret;
+ ret.method = boost::iostreams::zlib::deflated;
+ ret.noheader = true;
+ return ret;
+}
+}
+
+
static string toString(const ValidSchema& schema)
{
ostringstream oss;
@@ -50,9 +64,10 @@ static string toString(const ValidSchema
}
DataFileWriterBase::DataFileWriterBase(const char* filename,
- const ValidSchema& schema, size_t syncInterval) :
+ const ValidSchema& schema, size_t syncInterval, Codec codec) :
filename_(filename), schema_(schema), encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
+ codec_(codec),
stream_(fileOutputStream(filename)),
buffer_(memoryOutputStream()),
sync_(makeSync()), objectCount_(0)
@@ -64,6 +79,13 @@ DataFileWriterBase::DataFileWriterBase(c
}
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+ if (codec_ == NULL_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+ } else if (codec_ == DEFLATE_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
+ } else {
+ throw Exception("Unknown codec codec");
+ }
setMetadata(AVRO_SCHEMA_KEY, toString(schema));
writeHeader();
@@ -89,12 +111,35 @@ void DataFileWriterBase::sync()
encoderPtr_->init(*stream_);
avro::encode(*encoderPtr_, objectCount_);
- int64_t byteCount = buffer_->byteCount();
- avro::encode(*encoderPtr_, byteCount);
- encoderPtr_->flush();
-
- auto_ptr<InputStream> in = memoryInputStream(*buffer_);
- copy(*in, *stream_);
+ if (codec_ == NULL_CODEC) {
+ int64_t byteCount = buffer_->byteCount();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+ std::auto_ptr<InputStream> in = memoryInputStream(*buffer_);
+ copy(*in, *stream_);
+ } else {
+ std::vector<char> buf;
+ {
+ boost::iostreams::filtering_ostream os;
+ if (codec_ == DEFLATE_CODEC) {
+ os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
+ }
+ os.push(boost::iostreams::back_inserter(buf));
+ const uint8_t* data;
+ size_t len;
+
+ std::auto_ptr<InputStream> input = memoryInputStream(*buffer_);
+ while (input->next(&data, &len)) {
+ boost::iostreams::write(os, reinterpret_cast<const
char*>(data), len);
+ }
+ } // make sure all is flushed
+ std::auto_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t*>(&buf[0]), buf.size());
+ int64_t byteCount = buf.size();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+ copy(*in, *stream_);
+ }
encoderPtr_->init(*stream_);
avro::encode(*encoderPtr_, sync_);
@@ -272,8 +317,30 @@ bool DataFileReaderBase::readDataBlock()
decoder_->init(*stream_);
auto_ptr<InputStream> st = boundedInputStream(*stream_,
static_cast<size_t>(byteCount));
- dataDecoder_->init(*st);
- dataStream_ = st;
+ if (codec_ == NULL_CODEC) {
+ dataDecoder_->init(*st);
+ dataStream_ = st;
+ } else {
+ compressed_.clear();
+ const uint8_t* data;
+ size_t len;
+ while (st->next(&data, &len)) {
+ compressed_.insert(compressed_.end(), data, data + len);
+ }
+ // boost::iostreams::write(os, reinterpret_cast<const char*>(data),
len);
+ os_.reset(new boost::iostreams::filtering_istream());
+ if (codec_ == DEFLATE_CODEC) {
+ os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
+ } else {
+ throw Exception("Bad codec");
+ }
+ os_->push(boost::iostreams::basic_array_source<char>(
+ &compressed_[0], compressed_.size()));
+
+ std::auto_ptr<InputStream> in = istreamInputStream(*os_);
+ dataDecoder_->init(*in);
+ dataStream_ = in;
+ }
return true;
}
@@ -318,8 +385,13 @@ void DataFileReaderBase::readHeader()
}
it = metadata_.find(AVRO_CODEC_KEY);
- if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
- throw Exception("Unknown codec in data file: " + toString(it->second));
+ if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
+ codec_ = DEFLATE_CODEC;
+ } else {
+ codec_ = NULL_CODEC;
+ if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
+ throw Exception("Unknown codec in data file: " +
toString(it->second));
+ }
}
avro::decode(*decoder_, sync_);
Modified: avro/trunk/lang/c++/test/DataFileTests.cc
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/test/DataFileTests.cc?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/test/DataFileTests.cc (original)
+++ avro/trunk/lang/c++/test/DataFileTests.cc Tue Jan 7 21:53:54 2014
@@ -76,7 +76,7 @@ template <typename T> struct codec_trait
avro::encode(e, c.re);
avro::encode(e, c.im);
}
-
+
static void decode(Decoder& d, Complex<T>& c) {
avro::decode(d, c.re);
avro::decode(d, c.im);
@@ -147,7 +147,7 @@ public:
void testCleanup() {
BOOST_CHECK(boost::filesystem::remove(filename));
}
-
+
void testWrite() {
avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100);
int64_t re = 3;
@@ -281,7 +281,7 @@ public:
Pair p(writerSchema, GenericDatum());
int64_t re = 3;
int64_t im = 5;
-
+
const GenericDatum& ci = p.second;
while (df.read(p)) {
BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
@@ -388,6 +388,37 @@ public:
}
BOOST_CHECK_EQUAL(i, count);
}
+ /**
+ * Test writing DataFiles into other streams operations.
+ */
+ void testZip() {
+ const size_t number_of_objects = 100;
+ // first create a large file
+ ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
+ {
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, dschema, 16 * 1024, avro::DEFLATE_CODEC);
+
+ for (size_t i = 0; i < number_of_objects; ++i) {
+ ComplexInteger d;
+ d.re = i;
+ d.im = 2 * i;
+ writer.write(d);
+ }
+ }
+ {
+ avro::DataFileReader<ComplexInteger> reader(filename, dschema);
+ std::vector<int> found;
+ ComplexInteger record;
+ while (reader.read(record)) {
+ found.push_back(record.re);
+ }
+ BOOST_CHECK_EQUAL(found.size(), number_of_objects);
+ for (unsigned int i = 0; i < found.size(); ++i) {
+ BOOST_CHECK_EQUAL(found[i], i);
+ }
+ }
+ }
};
void addReaderTests(test_suite* ts, const shared_ptr<DataFileTest>& t)
@@ -403,7 +434,7 @@ void addReaderTests(test_suite* ts, cons
}
test_suite*
-init_unit_test_suite( int argc, char* argv[] )
+init_unit_test_suite( int argc, char* argv[] )
{
test_suite* ts= BOOST_TEST_SUITE("DataFile tests");
shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
@@ -430,5 +461,9 @@ init_unit_test_suite( int argc, char* ar
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGenericByName, t5));
addReaderTests(ts, t5);
+ shared_ptr<DataFileTest> t6(new DataFileTest("test6.df", dsch, dblsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6));
+
+
return ts;
}