zanmato1984 commented on code in PR #44470:
URL: https://github.com/apache/arrow/pull/44470#discussion_r1959164405
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
Review Comment:
```suggestion
ASSERT_OK(delay_func->AddKernel(delay_kernel));
ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
Review Comment:
```suggestion
// Test for GH-26818
```
##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,83 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {
+ // Test for ARROW-26818
Review Comment:
```suggestion
// Test for GH-26818
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+ for (bool preserve_order : {true, false}) {
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ARROW_CHECK_OK(scanner_builder->UseThreads(true));
+ ARROW_CHECK_OK(
+ scanner_builder->Filter(compute::call("delay",
{compute::field_ref("f0")})));
+ ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ write_options.filesystem = fs;
+ write_options.preserve_order = preserve_order;
+
+ ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
Review Comment:
```suggestion
ASSERT_OK(reader.ReadNext(&batch));
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+ for (bool preserve_order : {true, false}) {
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ARROW_CHECK_OK(scanner_builder->UseThreads(true));
+ ARROW_CHECK_OK(
Review Comment:
```suggestion
ASSERT_OK(scanner_builder->UseThreads(true));
ASSERT_OK(
```
##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,83 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ constexpr int kRowsPerBatch = 16;
+ constexpr int kNumBatches = 32;
+ constexpr random::SeedType kSeed = 42;
+ constexpr int kJitterMod = 4;
+ acero::RegisterTestNodes();
+
+ // Create an input table
+ std::shared_ptr<Table> table =
+ gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch,
kNumBatches);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+ auto scan_options = std::make_shared<ScanOptions>();
+ scan_options->use_threads = true;
+
+ for (bool preserve_order : {true, false}) {
+ auto scanner_builder = std::make_shared<ScannerBuilder>(dataset,
scan_options);
+ EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+ auto exprs = scan_options->projection.call()->arguments;
+ auto names = checked_cast<const compute::MakeStructOptions*>(
+ scan_options->projection.call()->options.get())
+ ->field_names;
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ dataset::WriteNodeOptions write_options(fs_write_options_);
+ write_options.write_options.file_write_options =
format->DefaultWriteOptions();
+ write_options.write_options.base_dir = "root";
+ write_options.write_options.partitioning =
+ std::make_shared<HivePartitioning>(schema({}));
+ write_options.write_options.basename_template = "{i}.feather";
+ write_options.write_options.filesystem = fs;
+ write_options.write_options.preserve_order = preserve_order;
+
+ // test plan of FileSystemDataset::Write with a jitter node that
guarantees exec
+ // batches are out of order
+ acero::Declaration plan = acero::Declaration::Sequence({
+ {"scan",
+ ScanNodeOptions{dataset, scanner->options(),
/*require_sequenced_output=*/false,
+ /*implicit_ordering=*/true}},
+ {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
+ {"project", acero::ProjectNodeOptions{std::move(exprs),
std::move(names)}},
+ {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
+ {"write", write_options},
+ });
+
+ ASSERT_OK(DeclarationToStatus(plan));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
Review Comment:
```suggestion
ASSERT_OK(reader.ReadNext(&batch));
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
Review Comment:
Why not just use `ASSERT_OK`?
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+ for (bool preserve_order : {true, false}) {
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ARROW_CHECK_OK(scanner_builder->UseThreads(true));
+ ARROW_CHECK_OK(
+ scanner_builder->Filter(compute::call("delay",
{compute::field_ref("f0")})));
+ ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ write_options.filesystem = fs;
+ write_options.preserve_order = preserve_order;
+
+ ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
+ int32_t prev = -1;
+ int out_of_order = 0;
+ while (batch != nullptr) {
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
+ auto numeric_scalar =
+
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
+ int32_t value = numeric_scalar->value;
Review Comment:
```suggestion
const int32_t* values =
batch->column(0)->data()->GetValues<int32_t>(1);
for (int row = 0; row < batch->num_rows(); ++row) {
int32_t value = values[row];
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
Review Comment:
Does this guarantee the unordered-ness in theory? It seems to me the
timing-based delay isn't reliable enough.
##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,83 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ constexpr int kRowsPerBatch = 16;
+ constexpr int kNumBatches = 32;
+ constexpr random::SeedType kSeed = 42;
+ constexpr int kJitterMod = 4;
+ acero::RegisterTestNodes();
+
+ // Create an input table
+ std::shared_ptr<Table> table =
+ gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch,
kNumBatches);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+ auto scan_options = std::make_shared<ScanOptions>();
+ scan_options->use_threads = true;
+
+ for (bool preserve_order : {true, false}) {
+ auto scanner_builder = std::make_shared<ScannerBuilder>(dataset,
scan_options);
+ EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+ auto exprs = scan_options->projection.call()->arguments;
+ auto names = checked_cast<const compute::MakeStructOptions*>(
+ scan_options->projection.call()->options.get())
+ ->field_names;
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ dataset::WriteNodeOptions write_options(fs_write_options_);
+ write_options.write_options.file_write_options =
format->DefaultWriteOptions();
+ write_options.write_options.base_dir = "root";
+ write_options.write_options.partitioning =
+ std::make_shared<HivePartitioning>(schema({}));
+ write_options.write_options.basename_template = "{i}.feather";
+ write_options.write_options.filesystem = fs;
+ write_options.write_options.preserve_order = preserve_order;
+
+ // test plan of FileSystemDataset::Write with a jitter node that
guarantees exec
+ // batches are out of order
+ acero::Declaration plan = acero::Declaration::Sequence({
+ {"scan",
+ ScanNodeOptions{dataset, scanner->options(),
/*require_sequenced_output=*/false,
+ /*implicit_ordering=*/true}},
+ {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
+ {"project", acero::ProjectNodeOptions{std::move(exprs),
std::move(names)}},
+ {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
+ {"write", write_options},
+ });
+
+ ASSERT_OK(DeclarationToStatus(plan));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
+ int32_t prev = -1;
+ auto out_of_order = false;
+ while (batch != nullptr) {
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
Review Comment:
Same as my other comment about using `GetScalar`.
##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,83 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ constexpr int kRowsPerBatch = 16;
+ constexpr int kNumBatches = 32;
+ constexpr random::SeedType kSeed = 42;
+ constexpr int kJitterMod = 4;
+ acero::RegisterTestNodes();
+
+ // Create an input table
+ std::shared_ptr<Table> table =
+ gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch,
kNumBatches);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+ auto scan_options = std::make_shared<ScanOptions>();
+ scan_options->use_threads = true;
+
+ for (bool preserve_order : {true, false}) {
+ auto scanner_builder = std::make_shared<ScannerBuilder>(dataset,
scan_options);
+ EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+ auto exprs = scan_options->projection.call()->arguments;
+ auto names = checked_cast<const compute::MakeStructOptions*>(
+ scan_options->projection.call()->options.get())
+ ->field_names;
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ dataset::WriteNodeOptions write_options(fs_write_options_);
+ write_options.write_options.file_write_options =
format->DefaultWriteOptions();
+ write_options.write_options.base_dir = "root";
+ write_options.write_options.partitioning =
+ std::make_shared<HivePartitioning>(schema({}));
+ write_options.write_options.basename_template = "{i}.feather";
+ write_options.write_options.filesystem = fs;
+ write_options.write_options.preserve_order = preserve_order;
+
+ // test plan of FileSystemDataset::Write with a jitter node that
guarantees exec
+ // batches are out of order
+ acero::Declaration plan = acero::Declaration::Sequence({
+ {"scan",
+ ScanNodeOptions{dataset, scanner->options(),
/*require_sequenced_output=*/false,
+ /*implicit_ordering=*/true}},
+ {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
+ {"project", acero::ProjectNodeOptions{std::move(exprs),
std::move(names)}},
+ {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
+ {"write", write_options},
+ });
+
+ ASSERT_OK(DeclarationToStatus(plan));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
+ int32_t prev = -1;
+ auto out_of_order = false;
+ while (batch != nullptr) {
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
+ auto numeric_scalar =
+
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
+ int32_t value = numeric_scalar->value;
+ if (value <= prev) {
+ out_of_order = true;
+ }
+ prev = value;
+ }
+ ABORT_NOT_OK(reader.ReadNext(&batch));
Review Comment:
```suggestion
ASSERT_OK(reader.ReadNext(&batch));
```
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+ for (bool preserve_order : {true, false}) {
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ARROW_CHECK_OK(scanner_builder->UseThreads(true));
+ ARROW_CHECK_OK(
+ scanner_builder->Filter(compute::call("delay",
{compute::field_ref("f0")})));
+ ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ write_options.filesystem = fs;
+ write_options.preserve_order = preserve_order;
+
+ ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
+ int32_t prev = -1;
+ int out_of_order = 0;
+ while (batch != nullptr) {
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
+ auto numeric_scalar =
+
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
+ int32_t value = numeric_scalar->value;
+ if (value <= prev) {
+ out_of_order++;
+ }
+ prev = value;
+ }
+ ABORT_NOT_OK(reader.ReadNext(&batch));
Review Comment:
```suggestion
ASSERT_OK(reader.ReadNext(&batch));
```
##########
python/pyarrow/dataset.py:
##########
@@ -848,9 +848,9 @@ def _ensure_write_partitioning(part, schema, flavor):
def write_dataset(data, base_dir, *, basename_template=None, format=None,
- partitioning=None, partitioning_flavor=None, schema=None,
- filesystem=None, file_options=None, use_threads=True,
Review Comment:
Is this a breaking change? I.e., it inserts a new parameter in between the
existing parameter list.
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+ compute::ExecResult* out) {
+ const ArraySpan& input = batch[0].array;
+ const uint32_t* input_values = input.GetValues<uint32_t>(1);
+ uint8_t* output_values = out->array_span()->buffers[1].data;
+
+ // Boolean data is stored in 1 bit per value
+ for (int64_t i = 0; i < input.length; ++i) {
+ if (input_values[i] % 16 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ bit_util::SetBitTo(output_values, i, true);
+ }
+
+ return Status::OK();
+}
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+ // Test for ARROW-26818
+ auto format = std::make_shared<IpcFileFormat>();
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
+ auto dataset = std::make_shared<InMemoryDataset>(table);
+
+ // register the scalar function that delays execution for some rows
+ // this guarantees the writing phase sees out-of-order exec batches
+ auto delay_func = std::make_shared<compute::ScalarFunction>("delay",
compute::Arity(1),
+
compute::FunctionDoc());
+ compute::ScalarKernel delay_kernel;
+ delay_kernel.exec = delay;
+ delay_kernel.signature = compute::KernelSignature::Make({uint32()},
boolean());
+ ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
+ ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+ for (bool preserve_order : {true, false}) {
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ARROW_CHECK_OK(scanner_builder->UseThreads(true));
+ ARROW_CHECK_OK(
+ scanner_builder->Filter(compute::call("delay",
{compute::field_ref("f0")})));
+ ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ write_options.filesystem = fs;
+ write_options.preserve_order = preserve_order;
+
+ ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+ // Read the file back out and verify the order
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset,
dataset_factory->Finish(FinishOptions{}));
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK(scanner_builder->UseThreads(false));
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+ TableBatchReader reader(*actual);
+ std::shared_ptr<RecordBatch> batch;
+ ABORT_NOT_OK(reader.ReadNext(&batch));
+ int32_t prev = -1;
+ int out_of_order = 0;
+ while (batch != nullptr) {
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
+ auto numeric_scalar =
+
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
+ int32_t value = numeric_scalar->value;
Review Comment:
This should be more efficient and simple than using `GetScalar`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]