This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new d2f92b4 Fixed C++ data file reader to handle zero-object blocks (#414)
d2f92b4 is described below
commit d2f92b445cf07683b7aa9afd2f69e2b23a033e6b
Author: Thiruvalluvan M G <[email protected]>
AuthorDate: Tue Dec 25 07:40:02 2018 +0530
Fixed C++ data file reader to handle zero-object blocks (#414)
---
lang/c++/api/DataFile.hh | 2 +-
lang/c++/impl/DataFile.cc | 42 +++++++++++++++++++++++-------------------
lang/c++/test/DataFileTests.cc | 25 ++++++++++++++++++++++---
3 files changed, 46 insertions(+), 23 deletions(-)
diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index 1a95296..5e2919c 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -206,7 +206,7 @@ class AVRO_DECL DataFileReaderBase : boost::noncopyable {
std::string uncompressed;
void readHeader();
- bool readDataBlock();
+ void readDataBlock();
void doSeek(int64_t position);
public:
/**
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index ad645ec..5b8209b 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -317,21 +317,23 @@ std::ostream& operator << (std::ostream& os, const
DataFileSync& s)
bool DataFileReaderBase::hasMore()
{
- if (eof_) {
- return false;
- } else if (objectCount_ != 0) {
- return true;
- }
+ for (; ;) {
+ if (eof_) {
+ return false;
+ } else if (objectCount_ != 0) {
+ return true;
+ }
- dataDecoder_->init(*dataStream_);
- drain(*dataStream_);
- DataFileSync s;
- decoder_->init(*stream_);
- avro::decode(*decoder_, s);
- if (s != sync_) {
- throw Exception("Sync mismatch");
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ DataFileSync s;
+ decoder_->init(*stream_);
+ avro::decode(*decoder_, s);
+ if (s != sync_) {
+ throw Exception("Sync mismatch");
+ }
+ readDataBlock();
}
- return readDataBlock();
}
class BoundedInputStream : public InputStream {
@@ -377,7 +379,7 @@ unique_ptr<InputStream> boundedInputStream(InputStream& in,
size_t limit)
return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
}
-bool DataFileReaderBase::readDataBlock()
+void DataFileReaderBase::readDataBlock()
{
decoder_->init(*stream_);
blockStart_ = stream_->byteCount();
@@ -385,7 +387,7 @@ bool DataFileReaderBase::readDataBlock()
size_t n = 0;
if (! stream_->next(&p, &n)) {
eof_ = true;
- return false;
+ return;
}
stream_->backup(n);
avro::decode(*decoder_, objectCount_);
@@ -452,7 +454,6 @@ bool DataFileReaderBase::readDataBlock()
dataDecoder_->init(*in);
dataStream_ = std::move(in);
}
- return true;
}
void DataFileReaderBase::close()
@@ -515,7 +516,8 @@ void DataFileReaderBase::readHeader()
blockStart_ = stream_->byteCount();
}
-void DataFileReaderBase::doSeek(int64_t position) {
+void DataFileReaderBase::doSeek(int64_t position)
+{
if (SeekableInputStream *ss = dynamic_cast<SeekableInputStream
*>(stream_.get())) {
if (!eof_) {
dataDecoder_->init(*dataStream_);
@@ -529,12 +531,14 @@ void DataFileReaderBase::doSeek(int64_t position) {
}
}
-void DataFileReaderBase::seek(int64_t position) {
+void DataFileReaderBase::seek(int64_t position)
+{
doSeek(position);
readDataBlock();
}
-void DataFileReaderBase::sync(int64_t position) {
+void DataFileReaderBase::sync(int64_t position)
+{
doSeek(position);
DataFileSync sync_buffer;
const uint8_t *p = 0;
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 7a7f1d9..acdb16a 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -47,7 +47,7 @@ using avro::GenericDatum;
using avro::GenericRecord;
using avro::NodePtr;
-const int count = 1000;
+const int DEFAULT_COUNT = 1000;
template <typename T>
struct Complex {
@@ -169,11 +169,13 @@ class DataFileTest {
const char* filename;
const ValidSchema writerSchema;
const ValidSchema readerSchema;
+ const int count;
public:
- DataFileTest(const char* f, const char* wsch, const char* rsch) :
+ DataFileTest(const char* f, const char* wsch, const char* rsch,
+ int count = DEFAULT_COUNT) :
filename(f), writerSchema(makeValidSchema(wsch)),
- readerSchema(makeValidSchema(rsch)) { }
+ readerSchema(makeValidSchema(rsch)), count(count) { }
typedef pair<ValidSchema, GenericDatum> Pair;
@@ -189,6 +191,8 @@ public:
ComplexInteger c(re, im);
df.write(c);
}
+ // Simulate writing an empty block.
+ df.flush();
df.close();
}
@@ -399,8 +403,16 @@ public:
std::set<int64_t> sync_points_syncing;
std::set<int64_t> sync_points_reading;
{
+ /*
+ * sync() will stop at a block with 0 objects. But read()
+ * will transparently skip such blocks. So this test will
+ * fail if there are blocks with zero objects. In order to
+ * avoid such failures, we read one object after sync.
+ */
avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ ComplexInteger ci;
for (int64_t prev = 0; prev != df.previousSync(); df.sync(prev)) {
+ df.read(ci);
prev = df.previousSync();
sync_points_syncing.insert(prev);
}
@@ -632,6 +644,13 @@ test_suite*
init_unit_test_suite(int argc, char *argv[])
{
{
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test0.df");
+ shared_ptr<DataFileTest> t1(new DataFileTest("test1.d0", sch, isch,
0));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
+ addReaderTests(ts, t1);
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test1.df");
shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));