fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r988441666


##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -572,5 +576,615 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
+// Tests reading a repeated field using the RecordReader.
+TEST(RecordReaderTest, BasicReadRepeatedField) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like: {[10], [20, 20], [30, 30, 30]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(def_levels.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 3);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 6);
+  ASSERT_EQ(record_reader->levels_position(), 3);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+  std::vector<int16_t> read_defs(
+      record_reader->def_levels(),
+      record_reader->def_levels() + record_reader->levels_position());
+  std::vector<int16_t> read_reps(
+      record_reader->rep_levels(),
+      record_reader->rep_levels() + record_reader->levels_position());
+
+  ASSERT_THAT(read_vals, ElementsAre(10, 20, 20));
+  ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+  ASSERT_THAT(read_reps, ElementsAre(0, 0, 1));
+}
+
+// Test that we can skip required top level field.
+TEST(RecordReaderTest, SkipRequiredTopLevel) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 0;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, /*def_levels=*/{}, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/3);
+
+  ASSERT_EQ(records_skipped, 3);
+  ASSERT_EQ(record_reader->values_written(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 2);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+
+  // There are no null values to account for here.
+  ASSERT_THAT(read_vals, ElementsAre(30, 30));
+}
+
+// Skip an optional field. Intentionally included some null values.
+TEST(RecordReaderTest, SkipOptional) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, 10, 20, 30, null, 40, 50, 60}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 30, 40, 50, 60};
+  std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Skip {null, 10}
+    // This also tests when we start with a Skip.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    // Since we started with skipping, there was nothing in the buffer to 
consume
+    // for skipping.
+    ASSERT_EQ(record_reader->levels_written(), 0);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read 3 records: {20, null, 30}
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/3);
+
+    ASSERT_EQ(records_read, 3);
+    // One of these values is null. values_written() includes null values.
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    // When we read, we buffer some levels. Since there are only a few levels
+    // in our whole column, all of them are read.
+    // We had skipped 2 of the levels above. So there is only 6 left in total 
to
+    // read, and we read 3 of them here.
+    ASSERT_EQ(record_reader->levels_written(), 6);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    // ReadRecords for optional fields uses ReadValuesSpaced, so there is a
+    // placeholder for null.
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1));
+  }
+
+  {
+    // Skip {40, 50}.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+  }
+
+  {
+    // Read to the end of the column. Read {60}
+    // This test checks that ReadAndThrowAwayValues works, since if it
+    // does not we would read the wrong values.
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_EQ(read_vals[3], 60);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1, 1));
+  }
+
+  // We have exhausted all the records.
+  ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/3), 0);
+  ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/3), 0);
+}
+
+// Test skipping for repeated fields.
+TEST(RecordReaderTest, SkipRepeated) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, [20, 20, 20], null, [30, 30], [40]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {20, 20, 20, 30, 30, 40};
+  std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // This should skip the first null record.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_skipped, 1);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // For repeated fields, we need to read the levels to find the record
+    // boundaries and skip. So some levels are read, however, the skipped
+    // level should not be there after the skip. That's why levels_position()
+    // is 0.
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read [20, 20, 20]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Skip the null record and also skip [30, 30]
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // We remove the skipped levels from the buffer.
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Read [40]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20, 40));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1, 0));
+  }
+}
+
+// Test reading when one record spans multiple pages for a repeated field.
+TEST(RecordReaderTest, ReadPartialRecord) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::unique_ptr<PageReader> pager;
+
+  // Page 1: {[10], [20, 20, 20 ... } continues to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info.def_level,
+        /*rep_levels=*/{0, 0, 1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 2: {... 20, 20, ...} continues from previous page and to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 3: { ... 20, [30]} continues from previous page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 0}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Read [10]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_THAT(read_vals, ElementsAre(10));
+  }
+
+  {
+    // Read [20, 20, 20, 20, 20, 20] that spans multiple pages.
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_THAT(read_vals, ElementsAre(10, 20, 20, 20, 20, 20, 20));
+  }
+
+  {
+    // Read [30]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_THAT(read_vals, ElementsAre(10, 20, 20, 20, 20, 20, 20, 30));
+  }
+}
+
+// Test skipping for repeated fields for the case when one record spans 
multiple
+// pages.
+TEST(RecordReaderTest, SkipPartialRecord) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::unique_ptr<PageReader> pager;
+
+  // Page 1: {[10], [20, 20, 20 ... } continues to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info.def_level,
+        /*rep_levels=*/{0, 0, 1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 2: {... 20, 20, ...} continues from previous page and to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 3: { ... 20, [30]} continues from previous page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 0}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Read [10]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_THAT(read_vals, ElementsAre(10));
+    ASSERT_EQ(record_reader->values_written(), 1);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // There are 4 levels in the first page.
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 1);
+  }
+
+  {
+    // Skip the record that goes across pages.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_skipped, 1);
+    ASSERT_EQ(record_reader->values_written(), 1);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 2);
+    ASSERT_EQ(record_reader->levels_position(), 1);
+  }
+
+  {
+    // Read [30]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 2);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 2);
+    ASSERT_EQ(record_reader->levels_position(), 2);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(10, 30));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 0));
+  }
+}
+
+// Test that Skip works on ByteArrays. Specifically, this is testing
+// ReadAndThrowAwayValues for ByteArrays.
+TEST(RecordReaderTest, SkipByteArray) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 0;
+
+  // Must use REPEATED to excercise ReadAndThrowAwayValues for ByteArrays. It
+  // does not do any buffering for Optional or Required fields as it calls
+  // ResetValues after every read.
+  NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::unique_ptr<PageReader> pager;
+
+  int levels_per_page = 90;
+  int num_pages = 1;
+
+  std::vector<int16_t> def_levels;
+  std::vector<int16_t> rep_levels;
+  std::vector<ByteArray> values;
+  std::vector<uint8_t> buffer;
+
+  MakePages<ByteArrayType>(&descr, num_pages, levels_per_page, def_levels, 
rep_levels,
+                           values, buffer, pages, Encoding::PLAIN);
+
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  // Read one-third of the page.
+  ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/30), 30);
+
+  // Skip 30 records.
+  ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/30), 30);
+
+  // Read 60 more records. Only 30 will be read, since we read 30 and skipped 
30,
+  // so only 30 is left.
+  ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/60), 30);
+
+  auto binary_reader = dynamic_cast<BinaryRecordReader*>(record_reader.get());
+  ASSERT_NE(binary_reader, nullptr);
+  // Chunks are reset after this call.
+  ::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
+  ASSERT_EQ(array_vector.size(), 1);
+  auto binary_array = 
dynamic_cast<::arrow::BinaryArray*>(array_vector[0].get());
+  ASSERT_NE(binary_array, nullptr);
+  ASSERT_EQ(binary_array->length(), 60);
+
+  // Our values above are not spaced, however, the RecordReader will
+  // read spaced for nullable values.
+  // Create spaced expected values.
+  std::vector<std::string_view> expected_values;
+  size_t values_index = 0;
+  for (int i = 0; i < 90; ++i) {
+    if (def_levels[i] == 0) {
+      expected_values.emplace_back();
+      continue;
+    }
+    expected_values.emplace_back(reinterpret_cast<const 
char*>(values[values_index].ptr),
+                                 values[values_index].len);
+    ++values_index;
+  }
+
+  // Check that the expected values match the actual values.
+  for (size_t i = 0; i < 30; ++i) {
+    ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i)), 0);
+  }
+  // Repeat for the next range that we read.
+  for (size_t i = 60; i < 90; ++i) {
+    ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i - 30)), 0);
+  }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to