felipeblazing commented on a change in pull request #11426:
URL: https://github.com/apache/arrow/pull/11426#discussion_r745712534



##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  explicit DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    bool pushed = false;
+    auto resources = context_->memory_resources();
+    for (auto memory_resource : resources->memory_resources()) {
+      auto memory_used = memory_resource->memory_used();
+      if (memory_used < memory_resource->memory_limit()) {
+        ARROW_ASSIGN_OR_RAISE(auto data_holder, 
memory_resource->GetDataHolder(batch));
+        this->producer_.Push(std::move(data_holder));
+        pushed = true;
+        break;
+      }
+    }
+    if (!pushed) {
+      return Status::Invalid("No memory resource registered at all in the 
exec_context");
+    }
+    return Status::OK();
+  }
+  AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }
+
+ public:
+  ExecContext* context_;
+  PushGenerator<std::shared_ptr<DataHolder>> gen_;
+  PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
+};
+
+class DataHolderNode : public ExecNode {
+ public:
+  DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
+                 std::shared_ptr<Schema> output_schema, int num_outputs)
+      : ExecNode(plan, std::move(inputs), input_labels, 
std::move(output_schema),
+                 /*num_outputs=*/num_outputs) {
+    executor_ = plan->exec_context()->executor();
+
+    data_holder_manager_ =
+        
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());
+
+    auto status = task_group_.AddTask([this]() -> Result<Future<>> {

Review comment:
       I don't think this is how we should push into the next node. We need for 
a way for a node to be able to request this data from the DataHolder. Setting 
up a task that basically loops through inputs to push them forward doesn't 
allow for intelligent scheduling of these things in the future. 
   
   Two options I see are either
   
   1. Have the nodes that follow DataHolderManager pull from the 
DataHolderManager. This is relatively straight forward but lacks control.
   2. Have some kind of Simple Scheduler which pulls from the DataHolderManager 
that have accumulated the most HeldData and pushes those tasks forward. This is 
a bit more work but provides more value in my opinion.
   
   

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}
+
+}  // namespace
+
+Status StoreRecordBatch(const std::shared_ptr<RecordBatch>& record_batch,
+                        const std::shared_ptr<fs::FileSystem>& filesystem,
+                        const std::string& file_path) {
+  auto output = filesystem->OpenOutputStream(file_path).ValueOrDie();
+  auto writer =
+      arrow::ipc::MakeFileWriter(output.get(), 
record_batch->schema()).ValueOrDie();
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  return writer->Close();
+}
+Result<std::shared_ptr<RecordBatch>> RecoverRecordBatch(
+    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& 
file_path) {
+  ARROW_ASSIGN_OR_RAISE(auto input, filesystem->OpenInputFile(file_path));
+  ARROW_ASSIGN_OR_RAISE(auto reader, arrow::ipc::feather::Reader::Open(input));
+  std::shared_ptr<Table> table;
+  ARROW_RETURN_NOT_OK(reader->Read(&table));
+  TableBatchReader batch_iter(*table);
+  ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
+  return batch;
+}
+
+class DiskDataHolder : public DataHolder {
+ public:
+  DiskDataHolder(const std::shared_ptr<RecordBatch>& record_batch,
+                 MemoryPool* memory_pool)
+      : DataHolder(MemoryLevel::kDiskLevel), memory_pool_(memory_pool) {
+    std::string root_path;
+    std::string file_name = "data-holder-temp-" + RandomString(64) + 
".feather";
+
+    filesystem_ =
+        arrow::fs::FileSystemFromUri(cache_storage_root_path, 
&root_path).ValueOrDie();
+
+    file_path_ = root_path + file_name;
+    status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);

Review comment:
       So there are a few options here but what we are saying is that we have 
these different DataHolders that are created from record_batch. We could make a 
function which is like
   
   `template <typename DataHolderType> make_data_holder_from_batch(...)`
   
   And it would invoke the constructur with something like the filesystem and 
path pointing to a file that has already persisted the record batch. This would 
allow the constructor to be simpler while still being able to have a common api 
for creating the data_holders from a record_batch

##########
File path: cpp/src/arrow/compute/exec/data_holder_node_test.cc
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <random>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+
+using testing::UnorderedElementsAreArray;
+
+namespace arrow {
+namespace compute {
+
+struct TestDataHolderNode : public ::testing::Test {
+  static constexpr int kNumBatches = 10;
+
+  TestDataHolderNode() : rng_(0) {}
+
+  std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+    static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+        arrow::null(),    arrow::boolean(), arrow::int8(),    arrow::int16(),
+        arrow::int32(),   arrow::int64(),   arrow::float16(), arrow::float32(),
+        arrow::float64(), arrow::utf8(),    arrow::binary(),  arrow::date32()};
+
+    std::vector<std::shared_ptr<Field>> fields(num_inputs);
+    std::default_random_engine gen(42);
+    std::uniform_int_distribution<int> types_dist(
+        0, static_cast<int>(some_arrow_types.size()) - 1);
+    for (size_t i = 0; i < num_inputs; i++) {
+      int random_index = types_dist(gen);
+      auto col_type = some_arrow_types.at(random_index);
+      fields[i] =
+          field("column_" + std::to_string(i) + "_" + col_type->ToString(), 
col_type);
+    }
+    return schema(fields);
+  }
+
+  void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+                                 size_t num_batches, BatchesWithSchema* 
out_batches,
+                                 int multiplicity = 1, int64_t batch_size = 4) 
{
+    if (num_batches == 0) {
+      auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+      out_batches->batches.push_back(empty_record_batch);
+    } else {
+      for (size_t j = 0; j < num_batches; j++) {
+        out_batches->batches.push_back(
+            ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+      }
+    }
+
+    size_t batch_count = out_batches->batches.size();
+    for (int repeat = 1; repeat < multiplicity; ++repeat) {
+      for (size_t i = 0; i < batch_count; ++i) {
+        out_batches->batches.push_back(out_batches->batches[i]);
+      }
+    }
+    out_batches->schema = schema;
+  }

Review comment:
       I guess intead of random we could just do comprehensive so that we cover 
all the types listed here every time.

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };

Review comment:
       Do we want the memory levels to be more extensible than the ones that we 
staticlaly define? I think this would be fine for a first pass but we should 
think about later one when maybe the user wants to define their own memory 
levels.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();
+        if (memory_to_use < memory_resource->memory_limit()) {
+          ARROW_ASSIGN_OR_RAISE(auto data_holder, 
memory_resource->GetDataHolder(batch));
+          this->producer_.Push(std::move(data_holder));
+          break;
+        }
+      }
+    }
+    return Status::OK();
+  }
+  AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }
+
+ public:
+  ExecContext* context_;
+  PushGenerator<std::shared_ptr<DataHolder>> gen_;
+  PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
+};
+
+class DataHolderNode : public ExecNode {
+ public:
+  DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
+                 std::shared_ptr<Schema> output_schema, int num_outputs)
+      : ExecNode(plan, std::move(inputs), input_labels, 
std::move(output_schema),
+                 /*num_outputs=*/num_outputs) {
+    executor_ = plan->exec_context()->executor();
+
+    data_holder_manager_ =
+        
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());
+
+    auto status = task_group_.AddTask([this]() -> Result<Future<>> {
+      ARROW_DCHECK(executor_ != nullptr);
+      return executor_->Submit(this->stop_source_.token(), [this] {
+        auto generator = this->data_holder_manager_->generator();
+        auto iterator = MakeGeneratorIterator(std::move(generator));

Review comment:
       Maybe we can use a condition variable that waits until there are things 
to iterate on. When a generator has something to push it can notify that cv.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to