alamb commented on a change in pull request #1029:
URL: https://github.com/apache/arrow-datafusion/pull/1029#discussion_r712406143



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -308,8 +310,38 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
 
 /// Execute the [ExecutionPlan] and collect the results in memory
 pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> 
{
-    let stream = execute_stream(plan).await?;
-    common::collect(stream).await
+    let stream = execute_stream(plan.clone()).await?;
+    let any_plan = plan.as_any().downcast_ref::<UnionExec>();
+    match any_plan {
+        Some(&UnionExec { .. }) => {
+            let record_batches = common::collect(stream).await;
+            if any_plan.unwrap().is_all() {
+                return record_batches;
+            }
+            let mut new_record_batches = Vec::new();
+            let mut vec_str = Vec::new();
+            for record_batch in record_batches.unwrap() {
+                for _row in 0..record_batch.num_rows() {
+                    let mut array_str = String::new();
+                    let mut vec_array = Vec::new();
+                    for col in 0..record_batch.num_columns() {
+                        let column = record_batch.column(col);
+                        array_str += &*array_value_to_string(column, 1)?;
+                        vec_array.push(column.clone());
+                    }
+                    if vec_str.contains(&array_str) {

Review comment:
       I agree with @Dandandan  -- specifically I think you could make a plan 
that implemented  `SELECT x from foo UNION select x from bar`  by effectively 
creating the same plan as
   
   `SELECT distinct (select x from foo UNION ALL select x from bar)`
   
   You can see the plan that gets made by running `EXPLAIN ANALYZE`:
   
   ```
   explain analyze select distinct x from ( select 1 as x UNION ALL select 1 as 
x);
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
|
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=1, 
elapsed_compute=81.227µs]                                                       
                                                                                
       |
   |                   |   HashAggregateExec: mode=FinalPartitioned, gby=[x@0 
as x], aggr=[], metrics=[output_rows=1, elapsed_compute=314.837µs]              
                                                                                
  |
   |                   |     CoalesceBatchesExec: target_batch_size=4096, 
metrics=[output_rows=2, elapsed_compute=1.88962ms]                              
                                                                                
      |
   |                   |       RepartitionExec: partitioning=Hash([Column { 
name: "x", index: 0 }], 16), metrics=[repart_time{inputPartition=0}=7.166192ms, 
send_time{inputPartition=0}=NOT RECORDED, 
fetch_time{inputPartition=0}=11.674789ms] |
   |                   |         HashAggregateExec: mode=Partial, gby=[x@0 as 
x], aggr=[], metrics=[output_rows=2, elapsed_compute=846.656µs]                 
                                                                                
  |
   |                   |           UnionExec, metrics=[output_rows=2, 
elapsed_compute=7.633152ms]                                                     
                                                                                
          |
   |                   |             RepartitionExec: 
partitioning=RoundRobinBatch(16), metrics=[send_time{inputPartition=0}=9.492µs, 
repart_time{inputPartition=0}=NOT RECORDED, 
fetch_time{inputPartition=0}=84.573µs]                        |
   |                   |               ProjectionExec: expr=[1 as x], 
metrics=[output_rows=1, elapsed_compute=20.765µs]                               
                                                                                
          |
   |                   |                 EmptyExec: produce_one_row=true, 
metrics=[]                                                                      
                                                                                
      |
   |                   |             RepartitionExec: 
partitioning=RoundRobinBatch(16), 
metrics=[fetch_time{inputPartition=0}=78.872µs, 
send_time{inputPartition=0}=9.225µs, repart_time{inputPartition=0}=NOT 
RECORDED]                        |
   |                   |               ProjectionExec: expr=[1 as x], 
metrics=[output_rows=1, elapsed_compute=18.752µs]                               
                                                                                
          |
   |                   |                 EmptyExec: produce_one_row=true, 
metrics=[]                                                                      
                                                                                
      |
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   (so use a `UnionExec` followed by `HashAggregateExec`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to