benibus commented on code in PR #14355: URL: https://github.com/apache/arrow/pull/14355#discussion_r1037416823
########## cpp/src/arrow/json/reader_test.cc: ########## @@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) { AssertTablesEqual(*actual_table, *expected_table); } +class StreamingReaderTest : public ::testing::TestWithParam<bool> { + protected: + void SetUp() override { read_options_.use_threads = GetParam(); } + + static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) { + auto buffer = std::make_shared<Buffer>(str); + return std::make_shared<io::BufferReader>(std::move(buffer)); + } + // Stream with simulated latency + static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str, + double latency) { + return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency); + } + + Result<std::shared_ptr<StreamingReader>> MakeReader( + std::shared_ptr<io::InputStream> stream) { + return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_, + parse_options_); + } + template <typename... Args> + Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) { + return MakeReader(MakeTestStream(std::forward<Args>(args)...)); + } + + AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator( + std::shared_ptr<StreamingReader> reader) { + return [reader = std::move(reader)] { return reader->ReadNextAsync(); }; + } + template <typename... Args> + Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) { + ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...)); + return MakeGenerator(std::move(reader)); + } + + static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader, + std::shared_ptr<RecordBatch>* out) { + ASSERT_OK(reader->ReadNext(out)); + ASSERT_FALSE(IsIterationEnd(*out)); + } + static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) { + std::shared_ptr<RecordBatch> out; + ASSERT_OK(reader->ReadNext(&out)); + ASSERT_TRUE(IsIterationEnd(out)); + } + + struct TestCase { + std::string json; + int json_size; + int block_size; + int num_rows; + int num_batches; + std::shared_ptr<Schema> schema; + RecordBatchVector batches; + std::shared_ptr<Table> table; + }; + + // Creates a test case from valid JSON objects with a human-readable index field and a + // struct field of random data. `block_size_multiplier` is applied to the largest + // generated row length to determine the target block_size. i.e - higher multiplier + // means fewer batches + static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) { + FieldVector data_fields = {field("s", utf8()), field("f", float64()), + field("b", boolean())}; + FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))}; + TestCase out; + out.schema = schema(fields); + out.num_rows = num_rows; + + constexpr int kSeed = 0x432432; + std::default_random_engine engine(kSeed); + std::vector<std::string> rows(num_rows); + size_t max_row_size = 1; + + auto options = GenerateOptions::Defaults(); + options.null_probability = 0; + for (int i = 0; i < num_rows; ++i) { + StringBuffer string_buffer; + Writer writer(string_buffer); + ABORT_NOT_OK(Generate(data_fields, engine, &writer, options)); + std::string json = string_buffer.GetString(); + rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"}); + max_row_size = std::max(max_row_size, rows[i].size()); + } + + auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier); + // Deduce the expected record batches from the target block size. + std::vector<std::string> batch_rows; + size_t pos = 0; + for (const auto& row : rows) { + pos += row.size(); + if (pos > block_size) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + batch_rows.clear(); + pos -= block_size; + } + batch_rows.push_back(row); + out.json += row; + } + if (!batch_rows.empty()) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + } + + out.json_size = static_cast<int>(out.json.size()); + out.block_size = static_cast<int>(block_size); + out.num_batches = static_cast<int>(out.batches.size()); + out.table = *Table::FromRecordBatches(out.batches); + + return out; + } + + static std::string Join(const std::vector<std::string>& strings, + const std::string& delim = "", bool trailing_delim = false) { + std::string out; + for (size_t i = 0; i < strings.size();) { + out += strings[i++]; + if (i != strings.size() || trailing_delim) { + out += delim; + } + } + return out; + } + + internal::Executor* executor_ = internal::GetCpuThreadPool(); + ParseOptions parse_options_ = ParseOptions::Defaults(); + ReadOptions read_options_ = ReadOptions::Defaults(); + io::IOContext io_context_ = io::default_io_context(); +}; + +INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest, + ::testing::Values(false, true)); + +TEST_P(StreamingReaderTest, ErrorOnEmptyStream) { + ASSERT_RAISES(Invalid, MakeReader("")); + std::string data(100, '\n'); + for (auto block_size : {25, 49, 50, 100, 200}) { + read_options_.block_size = block_size; + ASSERT_RAISES(Invalid, MakeReader(data)); + } +} + +TEST_P(StreamingReaderTest, PropagateChunkingErrors) { + constexpr double kIoLatency = 1e-3; + + auto test_schema = schema({field("i", int64())}); + auto bad_first_chunk = Join( + { + R"({"i": 0 })", + R"({"i": 1})", + }, + "\n"); + auto bad_middle_chunk = Join( + { + R"({"i": 0})", + R"({"i": 1})", + R"({"i": 2})", + }, + "\n"); + + read_options_.block_size = 10; + ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk)); + + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency)); + + std::shared_ptr<RecordBatch> batch; + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_read(), 9); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch); + + ASSERT_RAISES(Invalid, reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_read(), 9); + AssertReadEnd(reader); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_read(), 9); +} + +TEST_P(StreamingReaderTest, PropagateParsingErrors) { Review Comment: Ran into some issues with this one. It seems that the non-newline-delimited chunker doesn't propagate errors for input like `{"a":0}}`. Instead it fails on a `DCHECK` [here](https://github.com/benibus/arrow/blob/1c60039f780a6eebf8106529dd30778dcb8c11f7/cpp/src/arrow/json/chunker.cc#L134). From what I can tell, this is because `partial` will not be the start of a valid object after the first chunk is processed (and validated by the parser). It does look like an easy fix, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org