fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r1007197455
##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -572,5 +576,549 @@ TEST_F(TestPrimitiveReader,
TestNonDictionaryEncodedPagesWithExposeEncoding) {
pages_.clear();
}
+class RecordReaderTest : public ::testing::Test {
+ public:
+ const int32_t kNullValue = -1;
+
+ void Init(int32_t max_def_level, int32_t max_rep_level, Repetition::type
repetition) {
+ level_info_.def_level = max_def_level;
+ level_info_.rep_level = max_rep_level;
+ repetition_type_ = repetition;
+
+ NodePtr type = schema::Int32("b", repetition);
+ descr_ = std::make_unique<ColumnDescriptor>(type, level_info_.def_level,
+ level_info_.rep_level);
+
+ record_reader_ = internal::RecordReader::Make(descr_.get(), level_info_);
+ }
+
+ void CheckReadValues(std::vector<int32_t> expected_values,
+ std::vector<int16_t> expected_defs,
+ std::vector<int16_t> expected_reps) {
+ const auto read_values = reinterpret_cast<const
int32_t*>(record_reader_->values());
+ std::vector<int32_t> read_vals(read_values,
+ read_values +
record_reader_->values_written());
+ ASSERT_EQ(read_vals.size(), expected_values.size());
+ for (size_t i = 0; i < expected_values.size(); ++i) {
+ if (expected_values[i] != kNullValue) {
+ ASSERT_EQ(expected_values[i], read_values[i]);
+ }
+ }
+
+ if (repetition_type_ != Repetition::REQUIRED) {
+ std::vector<int16_t> read_defs(
+ record_reader_->def_levels(),
+ record_reader_->def_levels() + record_reader_->levels_position());
+ ASSERT_TRUE(vector_equal(expected_defs, read_defs));
+ }
+
+ if (repetition_type_ == Repetition::REPEATED) {
+ std::vector<int16_t> read_reps(
+ record_reader_->rep_levels(),
+ record_reader_->rep_levels() + record_reader_->levels_position());
+ ASSERT_TRUE(vector_equal(expected_reps, read_reps));
+ }
+ }
+
+ void CheckState(int64_t values_written, int64_t null_count, int64_t
levels_written,
+ int64_t levels_position) {
+ ASSERT_EQ(record_reader_->values_written(), values_written);
+ ASSERT_EQ(record_reader_->null_count(), null_count);
+ ASSERT_EQ(record_reader_->levels_written(), levels_written);
+ ASSERT_EQ(record_reader_->levels_position(), levels_position);
+ }
+
+ protected:
+ std::shared_ptr<internal::RecordReader> record_reader_;
+ std::unique_ptr<ColumnDescriptor> descr_;
+ internal::LevelInfo level_info_;
+ Repetition::type repetition_type_;
+};
+
+// Tests reading a repeated field using the RecordReader.
+TEST_F(RecordReaderTest, BasicReadRepeatedField) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+
+ // Records look like: {[10], [20, 20], [30, 30, 30]}
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+ std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+ std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), values, /*num_values=*/static_cast<int>(def_levels.size()),
+ Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
+ level_info_.rep_level);
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+ ASSERT_EQ(records_read, 2);
+ CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/6,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{10, 20, 20}, /*expected_defs=*/{1, 1,
1},
+ /*expected_reps=*/{0, 0, 1});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/3,
+ /*levels_position=*/0);
+}
+
+// Test that we can skip required top level field.
+TEST_F(RecordReaderTest, SkipRequiredTopLevel) {
+ Init(/*max_def_level=*/0, /*max_rep_level=*/0, Repetition::REQUIRED);
+
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
+ Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, /*def_levels=*/{}, level_info_.def_level,
+ /*rep_levels=*/{}, level_info_.rep_level);
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/3);
+ ASSERT_EQ(records_skipped, 3);
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+ ASSERT_EQ(records_read, 2);
+ CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+ CheckReadValues(/*expected_values=*/{30, 30}, /*expected_defs=*/{},
+ /*expected_reps=*/{});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+}
+
+// Skip an optional field. Intentionally included some null values.
+TEST_F(RecordReaderTest, SkipOptional) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/0, Repetition::OPTIONAL);
+
+ // Records look like {null, 10, 20, 30, null, 40, 50, 60}
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 30, 40, 50, 60};
+ std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
+ Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, level_info_.def_level,
+ /*rep_levels=*/{}, level_info_.rep_level);
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ {
+ // Skip {null, 10}
+ // This also tests when we start with a Skip.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
+ ASSERT_EQ(records_skipped, 2);
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+ }
+
+ {
+ // Read 3 records: {20, null, 30}
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/3);
+
+ ASSERT_EQ(records_read, 3);
+ // values_written() includes null values.
+ // We had skipped 2 of the levels above. So there is only 6 left in total
to
+ // read, and we read 3 of them here.
+ CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/6,
+ /*levels_position=*/3);
+
+ // ReadRecords for optional fields uses ReadValuesSpaced, so there is a
+ // placeholder for null.
+ CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
/*expected_defs=*/{1, 0, 1},
+ /*expected_reps=*/{});
+ }
+
+ {
+ // Skip {40, 50}.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
+ ASSERT_EQ(records_skipped, 2);
+ CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/4,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
/*expected_defs=*/{1, 0, 1},
+ /*expected_reps=*/{});
+ // Reset after a Skip.
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+ /*levels_position=*/0);
+ }
+
+ {
+ // Read to the end of the column. Read {60}
+ // This test checks that ReadAndThrowAwayValues works, since if it
+ // does not we would read the wrong values.
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/1,
+ /*levels_position=*/1);
+ CheckReadValues(/*expected_values=*/{60},
+ /*expected_defs=*/{1},
+ /*expected_reps=*/{});
+ }
+
+ // We have exhausted all the records.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3), 0);
+ ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3), 0);
+}
+
+// Test skipping for repeated fields.
+TEST_F(RecordReaderTest, SkipRepeated) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+
+ // Records look like {null, [20, 20, 20], null, [30, 30], [40]}
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {20, 20, 20, 30, 30, 40};
+ std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1, 1};
+ std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
+ Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
+ level_info_.rep_level);
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ {
+ // This should skip the first null record.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1);
+ ASSERT_EQ(records_skipped, 1);
+ ASSERT_EQ(record_reader_->values_written(), 0);
+ ASSERT_EQ(record_reader_->null_count(), 0);
+ // For repeated fields, we need to read the levels to find the record
+ // boundaries and skip. So some levels are read, however, the skipped
+ // level should not be there after the skip. That's why levels_position()
+ // is 0.
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/0);
+ CheckReadValues(/*expected_values=*/{},
+ /*expected_defs=*/{},
+ /*expected_reps=*/{});
+ }
+
+ {
+ // Read [20, 20, 20]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, 20, 20},
+ /*expected_defs=*/{1, 1, 1},
+ /*expected_reps=*/{0, 1, 1});
+ }
+
+ {
+ // Skip the null record and also skip [30, 30]
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
+ ASSERT_EQ(records_skipped, 2);
+ // We remove the skipped levels from the buffer.
+ CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/4,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, 20, 20},
+ /*expected_defs=*/{1, 1, 1},
+ /*expected_reps=*/{0, 1, 1});
+ }
+
+ {
+ // Read [40]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/4, /*null_count=*/0, /*levels_written=*/4,
+ /*levels_position=*/4);
+ CheckReadValues(/*expected_values=*/{20, 20, 20, 40},
+ /*expected_defs=*/{1, 1, 1, 1},
+ /*expected_reps=*/{0, 1, 1, 0});
+ }
+}
+
+// Tests that for repeated fields, we first consume what is in the buffer
+// before reading more levels.
+TEST_F(RecordReaderTest, SkipRepeatedConsumeBufferFirst) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values(2048, 10);
+ std::vector<int16_t> def_levels(2048, 1);
+ std::vector<int16_t> rep_levels(2048, 0);
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
+ Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
+ level_info_.rep_level);
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+ {
+ // Read 1000 records. We will read 1024 levels because that is the minimum
+ // number of levels to read.
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1000);
+ ASSERT_EQ(records_read, 1000);
+ CheckState(/*values_written=*/1000, /*null_count=*/0,
/*levels_written=*/1024,
+ /*levels_position=*/1000);
+ std::vector<int32_t> expected_values(1000, 10);
+ std::vector<int16_t> expected_def_levels(1000, 1);
+ std::vector<int16_t> expected_rep_levels(1000, 0);
+ CheckReadValues(expected_values, expected_def_levels, expected_rep_levels);
+ // Reset removes the already consumed values and levels.
+ record_reader_->Reset();
+ }
+
+ { // Skip 12 records. Since we already have 24 in the buffer, we should not
be
+ // reading any more levels into the buffer, we will just consume 12 of it.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/12);
+ ASSERT_EQ(records_skipped, 12);
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/12,
+ /*levels_position=*/0);
+ // Everthing is empty because we reset the reader before this skip.
+ CheckReadValues(/*expected_values=*/{}, /*expected_def_levels=*/{},
+ /*expected_rep_levels=*/{});
+ }
+}
+
+// Test reading when one record spans multiple pages for a repeated field.
+TEST_F(RecordReaderTest, ReadPartialRecord) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+
+ std::vector<std::shared_ptr<Page>> pages;
+
+ // Page 1: {[10], [20, 20, 20 ... } continues to next page.
+ {
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), /*values=*/{10, 20, 20, 20}, /*num_values=*/4,
Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info_.def_level,
+ /*rep_levels=*/{0, 0, 1, 1}, level_info_.rep_level);
+ pages.push_back(std::move(page));
+ }
+
+ // Page 2: {... 20, 20, ...} continues from previous page and to next page.
+ {
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
+ /*rep_levels=*/{1, 1}, level_info_.rep_level);
+ pages.push_back(std::move(page));
+ }
+
+ // Page 3: { ... 20], [30]} continues from previous page.
+ {
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_.get(), /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
+ /*rep_levels=*/{1, 0}, level_info_.rep_level);
+ pages.push_back(std::move(page));
+ }
+
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ {
+ // Read [10]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/4,
+ /*levels_position=*/1);
+ CheckReadValues(/*expected_values=*/{10},
+ /*expected_defs=*/{1},
+ /*expected_reps=*/{0});
+ }
+
+ {
+ // Read [20, 20, 20, 20, 20, 20] that spans multiple pages.
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/7, /*null_count=*/0, /*levels_written=*/8,
+ /*levels_position=*/7);
+ CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20},
+ /*expected_defs=*/{1, 1, 1, 1, 1, 1, 1},
+ /*expected_reps=*/{0, 0, 1, 1, 1, 1, 1});
+ }
+
+ {
+ // Read [30]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/8, /*null_count=*/0, /*levels_written=*/8,
+ /*levels_position=*/8);
+ CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20, 30},
+ /*expected_defs=*/{1, 1, 1, 1, 1, 1, 1, 1},
+ /*expected_reps=*/{0, 0, 1, 1, 1, 1, 1, 0});
+ }
+}
+
+// Test skipping for repeated fields for the case when one record spans
multiple
+// pages.
+TEST_F(RecordReaderTest, SkipPartialRecord) {
+ Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+
+ std::vector<std::shared_ptr<Page>> pages;
+
+ // Page 1: {[10], [20, 20, 20 ... } continues to next page.
Review Comment:
Similarly, reading nulls is already tested in TestInt32FlatRepeated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]