pitrou commented on code in PR #14355:
URL: https://github.com/apache/arrow/pull/14355#discussion_r1043437419
##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,566 @@ TEST(ReaderTest, FailOnInvalidEOF) {
}
}
+class StreamingReaderTestBase {
+ public:
+ virtual ~StreamingReaderTestBase() = default;
+
+ protected:
+ static std::shared_ptr<io::InputStream> MakeTestStream(const std::string&
str) {
+ auto buffer = std::make_shared<Buffer>(str);
+ return std::make_shared<io::BufferReader>(std::move(buffer));
+ }
+ // Stream with simulated latency
+ static std::shared_ptr<io::InputStream> MakeTestStream(const std::string&
str,
+ double latency) {
+ return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+ }
+
+ Result<std::shared_ptr<StreamingReader>> MakeReader(
+ std::shared_ptr<io::InputStream> stream) {
+ return StreamingReader::Make(std::move(stream), read_options_,
parse_options_,
+ io_context_, executor_);
+ }
+ template <typename... Args>
+ Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+ return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+ }
+
+ AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+ std::shared_ptr<StreamingReader> reader) {
+ return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+ }
+ template <typename... Args>
+ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&...
args) {
+ ARROW_ASSIGN_OR_RAISE(auto reader,
MakeReader(std::forward<Args>(args)...));
+ return MakeGenerator(std::move(reader));
+ }
+
+ static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+ std::shared_ptr<RecordBatch>* out) {
+ ASSERT_OK(reader->ReadNext(out));
+ ASSERT_FALSE(IsIterationEnd(*out));
+ ASSERT_OK((**out).ValidateFull());
+ }
+ static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+ std::shared_ptr<RecordBatch> out;
+ ASSERT_OK(reader->ReadNext(&out));
+ ASSERT_TRUE(IsIterationEnd(out));
+ }
+
+ struct TestCase {
+ std::string json;
+ int json_size;
+ int block_size;
+ int num_rows;
+ int num_batches;
+ std::shared_ptr<Schema> schema;
+ RecordBatchVector batches;
+ std::shared_ptr<Table> table;
+ };
+
+ // Creates a test case from valid JSON objects with a human-readable index
field and a
+ // struct field of random data. `block_size_multiplier` is applied to the
largest
+ // generated row length to determine the target block_size. i.e - higher
multiplier
+ // means fewer batches
+ static TestCase GenerateTestCase(int num_rows, double block_size_multiplier
= 3.0) {
+ FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+ field("b", boolean())};
+ FieldVector fields = {field("i", int64()), field("d",
struct_({data_fields}))};
+ TestCase out;
+ out.schema = schema(fields);
+ out.num_rows = num_rows;
+
+ constexpr int kSeed = 0x432432;
+ std::default_random_engine engine(kSeed);
+ std::vector<std::string> rows(num_rows);
+ size_t max_row_size = 1;
+
+ auto options = GenerateOptions::Defaults();
+ options.null_probability = 0;
+ for (int i = 0; i < num_rows; ++i) {
+ StringBuffer string_buffer;
+ Writer writer(string_buffer);
+ ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+ std::string json = string_buffer.GetString();
+ rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+ max_row_size = std::max(max_row_size, rows[i].size());
+ }
+
+ auto block_size = static_cast<size_t>(max_row_size *
block_size_multiplier);
+ // Deduce the expected record batches from the target block size.
+ std::vector<std::string> batch_rows;
+ size_t pos = 0;
+ for (const auto& row : rows) {
+ pos += row.size();
+ if (pos > block_size) {
+ out.batches.push_back(
+ RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","),
"]"})));
+ batch_rows.clear();
+ pos -= block_size;
+ }
+ batch_rows.push_back(row);
+ out.json += row;
+ }
+ if (!batch_rows.empty()) {
+ out.batches.push_back(
+ RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","),
"]"})));
+ }
+
+ out.json_size = static_cast<int>(out.json.size());
+ out.block_size = static_cast<int>(block_size);
+ out.num_batches = static_cast<int>(out.batches.size());
+ out.table = *Table::FromRecordBatches(out.batches);
+
+ return out;
+ }
+
+ static std::string Join(const std::vector<std::string>& strings,
+ const std::string& delim = "", bool trailing_delim =
false) {
+ std::string out;
+ for (size_t i = 0; i < strings.size();) {
+ out += strings[i++];
+ if (i != strings.size() || trailing_delim) {
+ out += delim;
+ }
+ }
+ return out;
+ }
+
+ internal::Executor* executor_ = nullptr;
+ ParseOptions parse_options_ = ParseOptions::Defaults();
+ ReadOptions read_options_ = ReadOptions::Defaults();
+ io::IOContext io_context_ = io::default_io_context();
+};
+
+class AsyncStreamingReaderTest : public StreamingReaderTestBase, public
::testing::Test {
+ protected:
+ void SetUp() override { read_options_.use_threads = true; }
+};
+
+class StreamingReaderTest : public StreamingReaderTestBase,
+ public ::testing::TestWithParam<bool> {
+ protected:
+ void SetUp() override { read_options_.use_threads = GetParam(); }
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+ ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+ ASSERT_RAISES(Invalid, MakeReader(""));
+ std::string data(100, '\n');
+ for (auto block_size : {25, 49, 50, 100, 200}) {
+ read_options_.block_size = block_size;
+ ASSERT_RAISES(Invalid, MakeReader(data));
+ }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+ constexpr double kIoLatency = 1e-3;
+
+ auto test_schema = schema({field("i", int64())});
+ // Object straddles multiple blocks
+ auto bad_first_chunk = Join(
+ {
+ R"({"i": 0 })",
+ R"({"i": 1})",
+ },
+ "\n");
+ auto bad_middle_chunk = Join(
+ {
+ R"({"i": 0})",
+ R"({"i": 1})",
+ R"({"i": 2})",
+ },
+ "\n");
+
+ read_options_.block_size = 10;
+ ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+ ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+ std::shared_ptr<RecordBatch> batch;
+ AssertReadNext(reader, &batch);
+ EXPECT_EQ(reader->bytes_processed(), 9);
+ ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"),
*batch);
+
+ ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+ EXPECT_EQ(reader->bytes_processed(), 9);
+ AssertReadEnd(reader);
+ AssertReadEnd(reader);
+ EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+ auto test_schema = schema({field("n", int64())});
+ auto bad_first_block = Join(
+ {
+ R"({"n": })",
+ R"({"n": 10000})",
+ },
+ "\n");
+ auto bad_first_block_after_empty = Join(
+ {
+ R"( )",
+ R"({"n": })",
+ R"({"n": 10000})",
+ },
+ "\n");
+ auto bad_middle_block = Join(
+ {
+ R"({"n": 10000})",
+ R"({"n": 200 0})",
+ R"({"n": 30000})",
+ },
+ "\n");
+
+ read_options_.block_size = 16;
+ ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+ ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+ EXPECT_EQ(reader->bytes_processed(), 0);
+ AssertSchemaEqual(reader->schema(), test_schema);
+
+ AssertReadNext(reader, &batch);
+ EXPECT_EQ(reader->bytes_processed(), 13);
+ ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"),
*batch);
+
+ ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+ EXPECT_EQ(reader->bytes_processed(), 13);
+ AssertReadEnd(reader);
+ EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+ auto test_schema = schema({field("i", int64())});
+ auto bad_first_block = Join(
+ {
+ R"({"i":0}{1})",
+ R"({"i":2})",
+ },
+ "\n");
+ auto bad_middle_blocks = Join(
+ {
+ R"({"i": 0})",
+ R"({"i": 1})",
+ R"({}"i":2})",
+ R"({"i": 3})",
+ },
+ "\n");
+
+ std::shared_ptr<RecordBatch> batch;
+ std::shared_ptr<StreamingReader> reader;
+ Status status;
+ read_options_.block_size = 10;
+ parse_options_.newlines_in_values = true;
+
+ ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+ AssertReadNext(reader, &batch);
+ EXPECT_EQ(reader->bytes_processed(), 7);
+ ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"),
*batch);
+ status = reader->ReadNext(&batch);
+ EXPECT_EQ(reader->bytes_processed(), 7);
+ ASSERT_RAISES(Invalid, status);
+ EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
Review Comment:
For the record, we have `EXPECT_RAISES_WITH_MESSAGE_THAT` that may make such
checks more compact. No obligation though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]