vibhatha commented on a change in pull request #12033:
URL: https://github.com/apache/arrow/pull/12033#discussion_r788726816



##########
File path: docs/source/cpp/streaming_execution.rst
##########
@@ -305,3 +305,497 @@ Datasets may be scanned multiple times; just make 
multiple scan
 nodes from that dataset. (Useful for a self-join, for example.)
 Note that producing two scan nodes like this will perform all
 reads and decodes twice.
+
+Constructing ``ExecNode`` using Options
+=======================================
+
+Using the execution plan we can construct various queries. 
+To construct such queries, we have provided a set of building blocks
+referred to as :class:`ExecNode` s. These nodes provide the ability to  
+construct operations like filtering, projection, join, etc. 
+
+This is the list of operations associated with the execution plan:
+
+.. list-table:: Operations and Options
+   :widths: 50 50
+   :header-rows: 1
+
+   * - Operation
+     - Options
+   * - ``source``
+     - :class:`arrow::compute::SourceNodeOptions`
+   * - ``filter``
+     - :class:`arrow::compute::FilterNodeOptions`
+   * - ``project``
+     - :class:`arrow::compute::ProjectNodeOptions`
+   * - ``aggregate``
+     - :class:`arrow::compute::ScalarAggregateOptions`
+   * - ``sink``
+     - :class:`arrow::compute::SinkNodeOptions`
+   * - ``consuming_sink``
+     - :class:`arrow::compute::ConsumingSinkNodeOptions`
+   * - ``order_by_sink``
+     - :class:`arrow::compute::OrderBySinkNodeOptions`
+   * - ``select_k_sink``
+     - :class:`arrow::compute::SelectKSinkNodeOptions`
+   * - ``scan``
+     - :class:`arrow::compute::ScanNodeOptions` 
+   * - ``hash_join``
+     - :class:`arrow::compute::HashJoinNodeOptions`
+   * - ``write``
+     - :class:`arrow::dataset::WriteNodeOptions`
+   * - ``union``
+     - N/A
+
+
+.. _stream_execution_source_docs:
+
+``source``
+----------
+
+A `source` operation can be considered as an entry point to create a streaming 
execution plan. 
+:class:`arrow::compute::SourceNodeOptions` are used to create the ``source`` 
operation.  The
+`source` operation is the most generic and flexible type of source currently 
available but it can
+be quite tricky to configure.  To process data from files the scan operation 
is likely a simpler choice.
+The source node requires some kind of function that can be called to poll for 
more data.  This
+function should take no arguments and should return an
+``arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>``.
+This function might be reading a file, iterating through an in memory 
structure, or receiving data
+from a network connection.  The arrow library refers to these functions as 
`arrow::AsyncGenerator`
+and there are a number of utilities for working with these functions.  For 
this example we use 
+a vector of record batches that we've already stored in memory.
+In addition, the schema of the data must be known up front.  Arrow's streaming 
execution
+engine must know the schema of the data at each stage of the execution graph 
before any
+processing has begun.  This means we must supply the schema for a source node 
separately
+from the data itself.
+
+Struct to hold the data generator definition:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: BatchesWithSchema Definition)
+  :end-before: (Doc section: BatchesWithSchema Definition)
+  :linenos:
+  :lineno-match:
+
+Generating sample Batches for computation:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: MakeBasicBatches Definition)
+  :end-before: (Doc section: MakeBasicBatches Definition)
+  :linenos:
+  :lineno-match:
+
+Example of using ``source`` (usage of sink is explained in detail in 
:ref:`sink<stream_execution_sink_docs>`):
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Source Example)
+  :end-before: (Doc section: Source Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_filter_docs:
+
+``filter``
+----------
+
+``filter`` operation, as the name suggests, provides an option to define data 
filtering
+criteria. It selects rows matching a given expression. 
+Filters can be written using :class:`arrow::compute::Expression`. 
+For example, if we wish to keep rows where the value of column ``b``
+is greater than 3,  then we can use the following expression:: 
+
+  // a > 3
+  arrow::compute::Expression filter_opt = arrow::compute::greater(
+                                arrow::compute::field_ref("a"), 
+                                arrow::compute::literal(3));
+
+Using this option, the filter node can be constructed as follows::             
                                                                                
                                
+
+  // creating filter node
+  arrow::compute::ExecNode* filter;
+    ARROW_ASSIGN_OR_RAISE(filter, arrow::compute::MakeExecNode("filter", 
+                          // plan
+                          plan.get(),
+                          // previous node
+                          {scan}, 
+                          //filter node options
+                          arrow::compute::FilterNodeOptions{filter_opt}));
+
+Filter example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Filter Example)
+  :end-before: (Doc section: Filter Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_project_docs:
+
+``project``
+-----------
+
+``project`` operation rearranges, deletes, transforms, and creates columns.
+Each output column is computed by evaluating an expression
+against the source record batch. This is exposed via 
+:class:`arrow::compute::ProjectNodeOptions` which requires,
+an :class:`arrow::compute::Expression` and name for each of the output columns 
(if names are not
+provided, the string representations of exprs will be used).  
+
+Sample Expression for projection::
+
+  // a * 2 (multiply values in a column by 2)
+  arrow::compute::Expression a_times_2 = arrow::compute::call("multiply", 
+            {arrow::compute::field_ref("a"), arrow::compute::literal(2)});
+
+
+Creating a project node::
+
+  arrow::compute::ExecNode* project;
+      ARROW_ASSIGN_OR_RAISE(project, 
+          arrow::compute::MakeExecNode("project", 
+          // plan
+          plan.get(),
+          // previous node 
+          {scan},
+          // project node options 
+          arrow::compute::ProjectNodeOptions{{a_times_2}}));
+
+Project example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Project Example)
+  :end-before: (Doc section: Project Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_aggregate_docs:
+
+``aggregate``
+-------------
+
+``aggregate`` operation provides various data aggregation options. 
+The :class:`arrow::compute::AggregateNodeOptions` is used to 
+define the aggregation criteria. An aggregate node can first group data by
+one or more key columns or the keys can be left off to compute aggregates
+across the entire dataset.  Each aggregate node can compute any number of
+aggregation functions.  Each aggregation function will be applied to every
+field specified as a target.  The aggregation functions can be 
+selected from :ref:`this list of aggregation functions 
<aggregation-option-list>`.
+Note: This node is a "pipeline breaker" and will fully materialize the dataset 
in memory.
+In the future, spillover mechanisms will be added which should alleviate this 
constraint.
+
+An example for creating an aggregate node::
+
+  arrow::compute::CountOptions 
options(arrow::compute::CountOptions::ONLY_VALID);
+
+  auto aggregate_options = arrow::compute::AggregateNodeOptions{
+      /*aggregates=*/{{"hash_count", &options}},
+      /*targets=*/{"a"},
+      /*names=*/{"count(a)"},
+      /*keys=*/{"b"}};
+
+  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * aggregate,
+                            cp::MakeExecNode("aggregate", plan.get(), {source},
+                            aggregate_options));
+
+Aggregate example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Aggregate Example)
+  :end-before: (Doc section: Aggregate Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_sink_docs:
+
+``sink``
+--------
+
+``sink`` operation provides output and is the final node of a streaming 
+execution definition. :class:`arrow::compute::SinkNodeOptions` interface is 
used to pass 
+the required options. Similar to the source operator the sink operator exposes 
the output
+with a function that returns a record batch future each time it is called.  It 
is expected the
+caller will repeatedly call this function until the generator function is 
exhausted (returns
+arrow::util::optional::nullopt).  If this function is not called often enough 
then record batches
+will accumulate in memory.  An execution plan should only have one
+"terminal" node (one sink node).  An execution plan may "finish" by marking
+`exec_plan->finished()` as complete before the sink generator is fully 
consumed and the
+execution plan can be safely destroyed without harming the sink generator 
(which will hold
+references to the unconsumed batches).
+
+Example::
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  arrow::compute::ExecNode* sink;
+
+  ARROW_ASSIGN_OR_RAISE(sink, arrow::compute::MakeExecNode("sink", plan.get(), 
{source},
+                                                
arrow::compute::SinkNodeOptions{&sink_gen}));
+
+As a part of the Source Example, the Sink operation is also included;
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Source Example)
+  :end-before: (Doc section: Source Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_consuming_sink_docs:
+
+``consuming_sink``
+------------------
+
+``consuming_sink`` operator is a sink operation containing consuming operation 
within the
+execution plan (i.e. the exec plan should not complete until the consumption 
has completed).
+Unlike the `sink` node this node takes in a callback function that is expected 
to consume the
+batch.  Once this callback has finished the execution plan will no longer hold 
any reference to
+the batch.
+The consuming function may be called before a previous invocation has 
completed.  If the consuming
+function does not run quickly enough then many concurrent executions could 
pile up, blocking the
+CPU thread pool.  The execution plan will not be marked finished until all 
consuming function callbacks
+have been completed.
+Once all batches have been delivered the execution plan will wait for the 
`finish` future to complete
+before marking the execution plan finished.  This allows for workflows where 
the consumption function
+converts batches into async tasks (this is currently done internally for the 
dataset write node).
+
+Example::
+
+  // define a Custom SinkNodeConsumer
+  std::atomic<uint32_t> batches_seen{0};
+  arrow::Future<> finish = arrow::Future<>::Make();
+  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
+
+      CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, 
arrow::Future<>finish): 
+      batches_seen(batches_seen), finish(std::move(finish)) {}
+      // Consumption logic can be written here
+      arrow::Status Consume(cp::ExecBatch batch) override {
+      // data can be consumed in the expected way
+      // transfer to another system or just do some work 
+      // and write to disk
+      (*batches_seen)++;
+      return arrow::Status::OK();
+      }
+
+      arrow::Future<> Finish() override { return finish; }
+
+      std::atomic<uint32_t> *batches_seen;
+      arrow::Future<> finish;
+      
+  };
+  
+  std::shared_ptr<CustomSinkNodeConsumer> consumer =
+          std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
+
+  arrow::compute::ExecNode *consuming_sink;
+
+  ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", 
plan.get(),
+      {source}, cp::ConsumingSinkNodeOptions(consumer)));
+
+
+Consuming-Sink example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: ConsumingSink Example)
+  :end-before: (Doc section: ConsumingSink Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_order_by_sink_docs:
+
+``order_by_sink``
+-----------------
+
+``order_by_sink`` operation is an extension to the ``sink`` operation. 
+This operation provides the ability to guarantee the ordering of the 
+stream by providing the :class:`arrow::compute::OrderBySinkNodeOptions`. 
+Here the :class:`arrow::compute::SortOptions` are provided to define which 
columns 
+are used for sorting and whether to sort by ascending or descending values.
+Note: This node is a "pipeline breaker" and will fully materialize the dataset 
in memory.
+In the future, spillover mechanisms will be added which should alleviate this 
constraint.
+
+Example::
+
+  arrow::compute::ExecNode *sink;
+
+  ARROW_ASSIGN_OR_RAISE(sink,
+  arrow::compute::MakeExecNode("order_by_sink", plan.get(),
+  {source}, 
+  arrow::compute::OrderBySinkNodeOptions{
+  /*sort_options*/arrow::compute::SortOptions{
+  {    arrow::compute::SortKey{
+  //Column key(s) to order by and how to order by these sort keys.
+  "a",
+  // Sort Order
+  arrow::compute::SortOrder::Descending 
+  }}},&sink_gen}));
+
+
+Order-By-Sink example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: OrderBySink Example)
+  :end-before: (Doc section: OrderBySink Example)
+  :linenos:
+  :lineno-match:
+
+
+.. _stream_execution_select_k_docs:
+
+``select_k_sink``
+-----------------
+
+``select_k_sink`` option enables selecting k number of elements. 
+:class:`arrow::compute::SelectKOptions` which is a defined by 
+using :struct:`OrderBySinkNode` definition. This option returns a sink node 
that receives 
+inputs and then compute top_k/bottom_k.
+Note: This node is a "pipeline breaker" and will fully materialize the input 
in memory.
+In the future, spillover mechanisms will be added which should alleviate this 
constraint.
+
+Create SelectK Option::
+
+  arrow::compute::SelectKOptions options = 
arrow::compute::SelectKOptions::TopKDefault(
+              /*k=*/2, {"i32"});
+
+  ARROW_ASSIGN_OR_RAISE(
+    arrow::compute::ExecNode * k_sink_node,
+    arrow::compute::MakeExecNode("select_k_sink",
+      plan.get(), {source},
+      arrow::compute::SelectKSinkNodeOptions{options, &sink_gen}));
+
+
+SelectK example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: KSelect Example)
+  :end-before: (Doc section: KSelect Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_scan_docs:
+
+``scan``
+---------
+
+`scan` is an operation used to load and process datasets.  It should be 
preferred over the
+more generic `source` node when your input is a dataset.  The behavior is 
defined using 
+:class:`arrow::dataset::ScanNodeOptions`.  More information on datasets and 
the various
+scan options can be found in :ref:`dataset<cpp-dataset-reading>`.
+
+This node is capable of applying pushdown filters to the file readers which 
reduce
+the amount of data that needs to be read.  This means you may supply the same
+filter expression to the scan node that you also supply to the FilterNode 
because
+the filtering is done in two different places.
+
+Creating a Scan `ExecNode`::
+
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  options->use_async = true; 
+  options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  cp::ExecNode* scan;
+  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
+
+  ARROW_ASSIGN_OR_RAISE(scan,
+                          cp::MakeExecNode("scan", plan.get(), {}, 
+                            scan_node_options));
+
+Scan example:
+
+.. literalinclude:: 
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Scan Example)
+  :end-before: (Doc section: Scan Example)
+  :linenos:
+  :lineno-match:
+
+.. _stream_execution_hashjoin_docs:
+
+``hash_join``
+-------------
+
+``hash_join`` operation provides the relational algebra operation, join using 
hash-based
+algorithm. :class:`arrow::compute::HashJoinNodeOptions` contains the options 
required in 
+defining a join. JoinType can be one of LEFT_SEMI, RIGHT_SEMI, LEFT_ANTI, 
RIGHT_ANTI,
+INNER, LEFT_OUTER, RIGHT_OUTER and FULL_OUTER. Also the join-key or by which 
column/s, 
+and output suffixes can be set via the the join options. 

Review comment:
       Perhaps 
   
   "The hash_join supports left/right/full semi/anti/outerjoins. 
   Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a 
suffix term like "_x"
   which can be appended as a suffix for column names duplicated in both left 
and right 
   relations.) can be set via the the join options." ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to