avamingli opened a new pull request, #1213:
URL: https://github.com/apache/cloudberry/pull/1213

   This commit implements improvements to the handling of UNION operations in 
CBDB, specifically addressing challenges related to Parallel Append and Motion 
nodes within subqueries. We have disabled Parallel Append for UNION operations 
to prevent incorrect results caused by competition among workers for subnodes. 
This change mitigates the risk of premature task completion, which previously 
led to data loss in scenarios involving Motion Senders.
   
   To further enhance parallel processing capabilities, we have introduced a 
Parallel-oblivious Append approach. This allows multiple workers to operate 
independently without sharing state, eliminating the coordination issues 
associated with Parallel-aware Append strategies.
   
   By implementing these changes, we improve the reliability and correctness of 
UNION operations while maintaining overall system performance. This positions 
CBDB to effectively support parallel processing in a safer manner.
   
   ```sql
   select distinct a from t_distinct_0 union select distinct b from 
t_distinct_0;
                               QUERY PLAN
   ----------------------------------------------------------------------
    Gather Motion 6:1  (slice1; segments: 6)
      ->  HashAggregate
            Group Key: t_distinct_0.a
            ->  Redistribute Motion 6:6  (slice2; segments: 6)
                  Hash Key: t_distinct_0.a
                  Hash Module: 3
                  ->  Append
                        ->  GroupAggregate
                              Group Key: t_distinct_0.a
                              ->  Sort
                                    Sort Key: t_distinct_0.a
                                    ->  Redistribute Motion 6:6  (slice3;
   segments: 6)
                                          Hash Key: t_distinct_0.a
                                          Hash Module: 3
                                          ->  Streaming HashAggregate
                                                Group Key: t_distinct_0.a
                                                ->  Parallel Seq Scan on
   t_distinct_0
                        ->  GroupAggregate
                              Group Key: t_distinct_0_1.b
                              ->  Sort
                                    Sort Key: t_distinct_0_1.b
                                    ->  Redistribute Motion 6:6  (slice4;
   segments: 6)
                                          Hash Key: t_distinct_0_1.b
                                          Hash Module: 3
                                          ->  Streaming HashAggregate
                                                Group Key: t_distinct_0_1.b
                                                ->  Parallel Seq Scan on
   t_distinct_0 t_distinct_0_1
   ```
   
   ### performance
   
   see case[0] below
   
   #### no-parallel (3031.346 ms)
   
   ```sql
                                                                           
QUERY PLAN
   
------------------------------------------------------------------------------------------------------------------------------------------------------------
    Gather Motion 3:1  (slice1; segments: 3) (actual time=3024.080..3032.081 
rows=19999 loops=1)
      ->  HashAggregate (actual time=3024.080..3024.080 rows=6722 loops=1)
            Group Key: ao1.b, (count(ao1.a))
            Extra Text: (seg0)   hash table(s): 1; chain length 4.0 avg, 23 
max; using 6722 of 8192 buckets; total 0 expansions.
            ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual 
time=3008.080..3016.080 rows=6723 loops=1)
                  Hash Key: ao1.b, (count(ao1.a))
                  ->  Append (actual time=3004.080..3016.080 rows=6768 loops=1)
                        ->  Finalize HashAggregate (actual 
time=3004.080..3008.080 rows=3384 loops=1)
                              Group Key: ao1.b
                              Extra Text: (seg0)   hash table(s): 1; chain 
length 2.3 avg, 6 max; using 3368 of 8192 buckets; total 1 expansions.
                              ->  Redistribute Motion 3:3  (slice3; segments: 
3) (actual time=2880.077..2996.080 rows=3384 loops=1)
                                    Hash Key: ao1.b
                                    ->  Partial HashAggregate (actual 
time=3000.080..3004.080 rows=3385 loops=1)
                                          Group Key: ao1.b
                                          Extra Text: (seg0)   hash table(s): 
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                          ->  Seq Scan on ao1 (actual 
time=0.000..1292.034 rows=3466240 loops=1)
                        ->  Finalize HashAggregate (actual time=8.000..8.000 
rows=3384 loops=1)
                              Group Key: ao2.b
                              Extra Text: (seg0)   hash table(s): 1; chain 
length 4.4 avg, 16 max; using 3368 of 4096 buckets; total 0 expansions.
                              ->  Redistribute Motion 3:3  (slice4; segments: 
3) (actual time=0.000..4.000 rows=3384 loops=1)
                                    Hash Key: ao2.b
                                    ->  Partial HashAggregate (actual 
time=3004.080..3004.080 rows=3385 loops=1)
                                          Group Key: ao2.b
                                          Extra Text: (seg0)   hash table(s): 
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                          ->  Seq Scan on ao2 (actual 
time=0.000..1340.036 rows=3466240 loops=1)
    Planning Time: 1.192 ms
      (slice0)    Executor memory: 1267K bytes.
    * (slice1)    Executor memory: 434K bytes avg x 3x(0) workers, 436K bytes 
max (seg0).  Work_mem: 721K bytes max, 721K bytes wanted.
    * (slice2)    Executor memory: 598K bytes avg x 3x(0) workers, 666K bytes 
max (seg0).  Work_mem: 721K bytes max, 721K bytes wanted.
    * (slice3)    Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes 
max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
    * (slice4)    Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes 
max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
    Memory used:  128000kB
    Memory wanted:  5260kB
    Optimizer: Postgres query optimizer
    Execution Time: 3031.346 ms
   (40 rows)
   ```
   
   #### 4-parallel UNION (1226.660 ms)
   
   ```sql
   Gather Motion 12:1  (slice1; segments: 12) (actual time=1180.031..1188.031 
rows=19999 loops=1)
      ->  HashAggregate (actual time=1168.031..1168.031 rows=1670 loops=1)
            Group Key: ao1.b, (count(ao1.a))
            Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 3 max; 
using 1647 of 8192 buckets; total 0 expansions.
            ->  Redistribute Motion 12:12  (slice2; segments: 12) (actual 
time=1164.031..1164.031 rows=1670 loops=1)
                  Hash Key: ao1.b, (count(ao1.a))
                  Hash Module: 3
                  ->  Append (actual time=1148.030..1164.031 rows=1620 loops=1)
                        ->  Finalize HashAggregate (actual 
time=1148.030..1148.030 rows=810 loops=1)
                              Group Key: ao1.b
                              Extra Text: (seg0)   hash table(s): 1; chain 
length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
                              ->  Redistribute Motion 12:12  (slice3; segments: 
12) (actual time=932.025..1140.030 rows=3240 loops=1)
                                    Hash Key: ao1.b
                                    Hash Module: 3
                                    ->  Partial HashAggregate (actual 
time=1140.030..1140.030 rows=3385 loops=1)
                                          Group Key: ao1.b
                                          Extra Text: (seg0)   hash table(s): 
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                          ->  Parallel Seq Scan on ao1 (actual 
time=4.000..484.013 rows=900000 loops=1)
                        ->  Finalize HashAggregate (actual time=8.000..8.000 
rows=810 loops=1)
                              Group Key: ao2.b
                              Extra Text: (seg0)   hash table(s): 1; chain 
length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
                              ->  Redistribute Motion 12:12  (slice4; segments: 
12) (actual time=0.000..8.000 rows=3240 loops=1)
                                    Hash Key: ao2.b
                                    Hash Module: 3
                                    ->  Partial HashAggregate (actual 
time=1064.028..1068.028 rows=3385 loops=1)
                                          Group Key: ao2.b
                                          Extra Text: (seg0)   hash table(s): 
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                          ->  Parallel Seq Scan on ao2 (actual 
time=0.000..524.014 rows=848832 loops=1)
    Planning Time: 1.097 ms
      (slice0)    Executor memory: 1273K bytes.
    * (slice1)    Executor memory: 315K bytes avg x 12x(0) workers, 317K bytes 
max (seg0).  Work_mem: 337K bytes max, 337K bytes wanted.
    * (slice2)    Executor memory: 371K bytes avg x 12x(0) workers, 374K bytes 
max (seg0).  Work_mem: 241K bytes max, 241K bytes wanted.
    * (slice3)    Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes 
max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
    * (slice4)    Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes 
max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
    Memory used:  128000kB
    Memory wanted:  5260kB
    Optimizer: Postgres query optimizer
    Execution Time: 1226.660 ms
   (43 rows)
   
   ```
   
    #### case[0]
    
   ```sql
   create table ao1(a int, b int) using ao_column;
   create table ao2(a int, b int) using ao_column;
   insert into ao1 select i, i+1 from generate_series(1, 10000) i;
   
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   insert into ao1 select * from ao1;
   analyze ao1;
   insert into ao2 select * from ao1;
   analyze ao2;
   ```
   
   
   
   
   Authored-by: Zhang Mingli [email protected]
   
   <!-- Thank you for your contribution to Apache Cloudberry (Incubating)! -->
   
   Fixes #ISSUE_Number
   
   ### What does this PR do?
   <!-- Brief overview of the changes, including any major features or fixes -->
   
   ### Type of Change
   - [ ] Bug fix (non-breaking change)
   - [ ] New feature (non-breaking change)
   - [ ] Breaking change (fix or feature with breaking changes)
   - [ ] Documentation update
   
   ### Breaking Changes
   <!-- Remove if not applicable. If yes, explain impact and migration path -->
   
   ### Test Plan
   <!-- How did you test these changes? -->
   - [ ] Unit tests added/updated
   - [ ] Integration tests added/updated
   - [ ] Passed `make installcheck`
   - [ ] Passed `make -C src/test installcheck-cbdb-parallel`
   
   ### Impact
   <!-- Remove sections that don't apply -->
   **Performance:**
   <!-- Any performance implications? -->
   
   **User-facing changes:**
   <!-- Any changes visible to users? -->
   
   **Dependencies:**
   <!-- New dependencies or version changes? -->
   
   ### Checklist
   - [ ] Followed [contribution 
guide](https://cloudberry.apache.org/contribute/code)
   - [ ] Added/updated documentation
   - [ ] Reviewed code for security implications
   - [ ] Requested review from [cloudberry 
committers](https://github.com/orgs/apache/teams/cloudberry-committers)
   
   ### Additional Context
   <!-- Any other information that would help reviewers? Remove if none -->
   
   ### CI Skip Instructions
   <!--
   To skip CI builds, add the appropriate CI skip identifier to your PR title.
   The identifier must:
   - Be in square brackets []
   - Include the word "ci" and either "skip" or "no"
   - Only use for documentation-only changes or when absolutely necessary
   -->
   
   ---
   <!-- Join our community:
   - Mailing list: 
[[email protected]](https://lists.apache.org/[email protected])
 (subscribe: [email protected])
   - Discussions: https://github.com/apache/cloudberry/discussions -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to