benibus commented on code in PR #14355: URL: https://github.com/apache/arrow/pull/14355#discussion_r1036615799
########## cpp/src/arrow/json/reader_test.cc: ########## @@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) { AssertTablesEqual(*actual_table, *expected_table); } +class StreamingReaderTest : public ::testing::TestWithParam<bool> { + protected: + void SetUp() override { read_options_.use_threads = GetParam(); } + + static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) { + auto buffer = std::make_shared<Buffer>(str); + return std::make_shared<io::BufferReader>(std::move(buffer)); + } + // Stream with simulated latency + static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str, + double latency) { + return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency); + } + + Result<std::shared_ptr<StreamingReader>> MakeReader( + std::shared_ptr<io::InputStream> stream) { + return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_, + parse_options_); + } + template <typename... Args> + Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) { + return MakeReader(MakeTestStream(std::forward<Args>(args)...)); + } + + AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator( + std::shared_ptr<StreamingReader> reader) { + return [reader = std::move(reader)] { return reader->ReadNextAsync(); }; + } + template <typename... Args> + Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) { + ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...)); + return MakeGenerator(std::move(reader)); + } + + static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader, + std::shared_ptr<RecordBatch>* out) { + ASSERT_OK(reader->ReadNext(out)); + ASSERT_FALSE(IsIterationEnd(*out)); + } + static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) { + std::shared_ptr<RecordBatch> out; + ASSERT_OK(reader->ReadNext(&out)); + ASSERT_TRUE(IsIterationEnd(out)); + } + + struct TestCase { + std::string json; + int json_size; + int block_size; + int num_rows; + int num_batches; + std::shared_ptr<Schema> schema; + RecordBatchVector batches; + std::shared_ptr<Table> table; + }; + + // Creates a test case from valid JSON objects with a human-readable index field and a + // struct field of random data. `block_size_multiplier` is applied to the largest + // generated row length to determine the target block_size. i.e - higher multiplier + // means fewer batches + static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) { + FieldVector data_fields = {field("s", utf8()), field("f", float64()), + field("b", boolean())}; + FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))}; + TestCase out; + out.schema = schema(fields); + out.num_rows = num_rows; + + constexpr int kSeed = 0x432432; + std::default_random_engine engine(kSeed); + std::vector<std::string> rows(num_rows); + size_t max_row_size = 1; + + auto options = GenerateOptions::Defaults(); + options.null_probability = 0; + for (int i = 0; i < num_rows; ++i) { + StringBuffer string_buffer; + Writer writer(string_buffer); + ABORT_NOT_OK(Generate(data_fields, engine, &writer, options)); + std::string json = string_buffer.GetString(); + rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"}); + max_row_size = std::max(max_row_size, rows[i].size()); + } + + auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier); + // Deduce the expected record batches from the target block size. + std::vector<std::string> batch_rows; + size_t pos = 0; + for (const auto& row : rows) { + pos += row.size(); + if (pos > block_size) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + batch_rows.clear(); + pos -= block_size; + } + batch_rows.push_back(row); + out.json += row; + } + if (!batch_rows.empty()) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + } + + out.json_size = static_cast<int>(out.json.size()); + out.block_size = static_cast<int>(block_size); + out.num_batches = static_cast<int>(out.batches.size()); + out.table = *Table::FromRecordBatches(out.batches); + + return out; + } + + static std::string Join(const std::vector<std::string>& strings, + const std::string& delim = "", bool trailing_delim = false) { + std::string out; + for (size_t i = 0; i < strings.size();) { + out += strings[i++]; + if (i != strings.size() || trailing_delim) { + out += delim; + } + } + return out; + } + + internal::Executor* executor_ = internal::GetCpuThreadPool(); + ParseOptions parse_options_ = ParseOptions::Defaults(); + ReadOptions read_options_ = ReadOptions::Defaults(); + io::IOContext io_context_ = io::default_io_context(); +}; + +INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest, + ::testing::Values(false, true)); + +TEST_P(StreamingReaderTest, ErrorOnEmptyStream) { + ASSERT_RAISES(Invalid, MakeReader("")); + std::string data(100, '\n'); + for (auto block_size : {25, 49, 50, 100, 200}) { + read_options_.block_size = block_size; + ASSERT_RAISES(Invalid, MakeReader(data)); + } +} + +TEST_P(StreamingReaderTest, PropagateChunkingErrors) { + constexpr double kIoLatency = 1e-3; + + auto test_schema = schema({field("i", int64())}); + auto bad_first_chunk = Join( + { + R"({"i": 0 })", + R"({"i": 1})", + }, + "\n"); + auto bad_middle_chunk = Join( + { + R"({"i": 0})", + R"({"i": 1})", + R"({"i": 2})", + }, + "\n"); + + read_options_.block_size = 10; + ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk)); + + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency)); + + std::shared_ptr<RecordBatch> batch; + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_read(), 9); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch); + + ASSERT_RAISES(Invalid, reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_read(), 9); + AssertReadEnd(reader); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_read(), 9); +} + +TEST_P(StreamingReaderTest, PropagateParsingErrors) { + auto test_schema = schema({field("n", int64())}); + auto bad_first_block = Join( + { + R"({"n": })", + R"({"n": 10000})", + }, + "\n"); + auto bad_first_block_after_empty = Join( + { + R"( )", + R"({"n": })", + R"({"n": 10000})", + }, + "\n"); + auto bad_middle_block = Join( + { + R"({"n": 10000})", + R"({"n": 200 0})", + R"({"n": 30000})", + }, + "\n"); + + read_options_.block_size = 16; + ASSERT_RAISES(Invalid, MakeReader(bad_first_block)); + ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty)); + + std::shared_ptr<RecordBatch> batch; + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block)); + EXPECT_EQ(reader->bytes_read(), 0); + ASSERT_NE(reader->schema(), nullptr); + EXPECT_EQ(*reader->schema(), *test_schema); + + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_read(), 13); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch); + + ASSERT_RAISES(Invalid, reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_read(), 13); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_read(), 13); +} + +TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) { + std::string test_json(32, '\n'); + test_json += R"({"b": true, "s": "foo"})"; + ASSERT_EQ(test_json.length(), 55); + + parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())}); + read_options_.block_size = 24; + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json)); + EXPECT_EQ(reader->bytes_read(), 0); + + auto expected_schema = parse_options_.explicit_schema; + auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])"); + + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + std::shared_ptr<RecordBatch> actual_batch; + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 55); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) { + std::string test_json = + Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})", + R"({"s": "foo", "t": "2022-01-01", "b": true})"}, + "\n"); + + FieldVector expected_fields = {field("s", utf8())}; + std::shared_ptr<Schema> expected_schema = schema(expected_fields); + std::shared_ptr<RecordBatch> expected_batch; + std::shared_ptr<RecordBatch> actual_batch; + std::shared_ptr<StreamingReader> reader; + + parse_options_.explicit_schema = expected_schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = 48; + ASSERT_RAISES(Invalid, MakeReader(test_json)); + + expected_fields.push_back(field("t", utf8())); + expected_schema = schema(expected_fields); + expected_batch = + RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])"); + + parse_options_.explicit_schema = expected_schema; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + AssertReadNext(reader, &actual_batch); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + EXPECT_EQ(reader->bytes_read(), 32); + + AssertReadNext(reader, &actual_batch); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + EXPECT_EQ(reader->bytes_read(), 64); + + ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch)); + EXPECT_EQ(reader->bytes_read(), 64); + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) { + std::string test_json = + Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})", + R"({"s": "foo", "t": "2022-01-01", "b": true})"}, + "\n"); + + FieldVector expected_fields = {field("s", utf8()), field("t", utf8())}; + std::shared_ptr<Schema> expected_schema = schema(expected_fields); + std::shared_ptr<RecordBatch> expected_batch; + std::shared_ptr<RecordBatch> actual_batch; + std::shared_ptr<StreamingReader> reader; + + parse_options_.explicit_schema = expected_schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore; + read_options_.block_size = 48; + + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])"); + AssertReadNext(reader, &actual_batch); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + EXPECT_EQ(reader->bytes_read(), 32); + + expected_batch = + RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])"); + AssertReadNext(reader, &actual_batch); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + EXPECT_EQ(reader->bytes_read(), 64); + + AssertReadNext(reader, &actual_batch); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + EXPECT_EQ(reader->bytes_read(), 106); + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, InferredSchema) { + auto test_json = Join( + { + R"({"a": 0, "b": "foo" })", + R"({"a": 1, "c": true })", + R"({"a": 2, "d": "2022-01-01"})", + }, + "\n", true); + + std::shared_ptr<StreamingReader> reader; + std::shared_ptr<Schema> expected_schema; + std::shared_ptr<RecordBatch> expected_batch; + std::shared_ptr<RecordBatch> actual_batch; + + FieldVector fields = {field("a", int64()), field("b", utf8())}; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + parse_options_.explicit_schema = nullptr; + + // Schema derived from the first line + expected_schema = schema(fields); + + read_options_.block_size = 32; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 28); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 56); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 2, "b": null}])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 84); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + // Schema derived from the first 2 lines + fields.push_back(field("c", boolean())); + expected_schema = schema(fields); + + read_options_.block_size = 64; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([ + {"a": 0, "b": "foo", "c": null}, + {"a": 1, "b": null, "c": true} + ])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 56); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([ + {"a": 2, "b": null, "c": null} + ])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 84); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + // Schema derived from all 3 lines + fields.push_back(field("d", timestamp(TimeUnit::SECOND))); + expected_schema = schema(fields); + + read_options_.block_size = 96; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + ASSERT_NE(reader->schema(), nullptr); + ASSERT_EQ(*reader->schema(), *expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([ + {"a": 0, "b": "foo", "c": null, "d": null}, + {"a": 1, "b": null, "c": true, "d": null}, + {"a": 2, "b": null, "c": null, "d": "2022-01-01"} + ])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_read(), 84); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, AsyncReentrancy) { + constexpr int kNumRows = 16; + constexpr double kIoLatency = 1e-2; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches + 1); + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency)); + EXPECT_EQ(reader->bytes_read(), 0); + for (auto& future : futures) { + future = reader->ReadNextAsync(); + } + + ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures))); + EXPECT_EQ(reader->bytes_read(), expected.json_size); + ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results))); + EXPECT_EQ(batches.back(), nullptr); + batches.pop_back(); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); + ASSERT_TABLES_EQUAL(*expected.table, *table); +} + +TEST_P(StreamingReaderTest, FuturesOutliveReader) { + constexpr int kNumRows = 16; + constexpr double kIoLatency = 1e-2; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + auto stream = MakeTestStream(expected.json, kIoLatency); + std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches); + std::weak_ptr<StreamingReader> weak_reader; + { + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream)); + weak_reader = reader; + EXPECT_EQ(reader->bytes_read(), 0); + for (auto& future : futures) { + future = reader->ReadNextAsync(); + } + } + + auto all_future = All(std::move(futures)); + AssertNotFinished(all_future); + EXPECT_EQ(weak_reader.use_count(), 0); Review Comment: Doesn't serve any real purpose now... It was written for an earlier implementation where circular references were possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org