avamingli opened a new pull request, #1173:
URL: https://github.com/apache/cloudberry/pull/1173

   Postgres UPSTREAM does not support parallel DISTINCT processing since 
DISTINCT across multiple workers cannot be guaranteed. In MPP databases, 
however, we can utilize Motion to redistribute tuples across multiple workers 
within a parallel query.
   
   For a DISTINCT query like:
   ```sql
   select distinct a from t_distinct_0;
   ```
   we can create a parallel plan based on the underlying node's Parallel Scan 
on the table. The tuples are distributed randomly after the Parallel Scan, even 
when the distribution key matches the target expression.
   
   The pre-distinct node uses Streaming HashAggregate or HashAggregate to 
deduplicate some tuples in parallel, which are then redistributed according to 
the DISTINCT expressions. Finally, a second-stage process handles the DISTINCT 
operation.
   ```sql
                            QUERY PLAN
   ------------------------------------------------------------
    Gather Motion 6:1  (slice1; segments: 6)
      ->  HashAggregate
            Group Key: a
            ->  Redistribute Motion 6:6  (slice2; segments: 6)
                  Hash Key: a
                  Hash Module: 3
                  ->  Streaming HashAggregate
                        Group Key: a
                        ->  Parallel Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (10 rows)
   ```
   Parallel Group Aggregation is also supported:
   ```sql
   explain(costs off)
   select distinct a, b from t_distinct_0;
                           QUERY PLAN
   -----------------------------------------------------------
    GroupAggregate
      Group Key: a, b
      ->  Gather Motion 6:1  (slice1; segments: 6)
            Merge Key: a, b
            ->  GroupAggregate
                  Group Key: a, b
                  ->  Sort
                        Sort Key: a, b
                        ->  Parallel Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (10 rows)
   ```
   
   ## performance
   
   For DISTINCT, the performance in parallel processing is closely tied to the 
tuples among each worker process, particularly in the case of streaming hash 
aggregates. 
   Overall, the test cases indicate that a parallel execution plan with two 
workers achieves nearly half the runtime of a non-parallel plan.
   
   DISTINCT | 1 | 2 | 3 | 2-parallel-avg
   -- | -- | -- | -- | --
   non-parallel | 8207.833 | 8416.01 | 8163.197 | 8262.346667
   hashagg | 4550.331 | 4558.036 | 4219.485 | 4442.617333
   streaming-hashagg | 4197.242 | 4200.889 | 4099.575 | 4165.902
   hashagg-groupagg | 4336.934 | 4710.07 | 4352.795 | 4466.599667
   
   
   
   
![chart](https://github.com/user-attachments/assets/9a0d70b6-5419-4ae6-a1bc-ad8728fe0ef9)
   
   ### non-parallel
   ```sql
   select distinct a from t_distinct_0;
                            QUERY PLAN
   ------------------------------------------------------------
    Gather Motion 3:1  (slice1; segments: 3)
      ->  HashAggregate
            Group Key: a
            ->  Redistribute Motion 3:3  (slice2; segments: 3)
                  Hash Key: a
                  ->  HashAggregate
                        Group Key: a
                        ->  Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (9 rows)
   ```
   
   ### hashagg 
   ```sql
   select distinct a from t_distinct_0;
                            QUERY PLAN
   ------------------------------------------------------------
    Gather Motion 6:1  (slice1; segments: 6)
      ->  HashAggregate
            Group Key: a
            ->  Redistribute Motion 6:6  (slice2; segments: 6)
                  Hash Key: a
                  Hash Module: 3
                  ->  HashAggregate
                        Group Key: a
                        ->  Parallel Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (10 rows)
   ```
   
   ### streaming-hashagg
   
   ```sql
   select distinct a from t_distinct_0;
                            QUERY PLAN
   ------------------------------------------------------------
    Gather Motion 6:1  (slice1; segments: 6)
      ->  HashAggregate
            Group Key: a
            ->  Redistribute Motion 6:6  (slice2; segments: 6)
                  Hash Key: a
                  Hash Module: 3
                  ->  Streaming HashAggregate
                        Group Key: a
                        ->  Parallel Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (10 rows)
   ```
   
   
   
   ### hashagg-groupagg 
   ```sql
   select distinct a from t_distinct_0;
                               QUERY PLAN
   ------------------------------------------------------------------
    Gather Motion 6:1  (slice1; segments: 6)
      Merge Key: a
      ->  GroupAggregate
            Group Key: a
            ->  Sort
                  Sort Key: a
                  ->  Redistribute Motion 6:6  (slice2; segments: 6)
                        Hash Key: a
                        Hash Module: 3
                        ->  Streaming HashAggregate
                              Group Key: a
                              ->  Parallel Seq Scan on t_distinct_0
    Optimizer: Postgres query optimizer
   (13 rows)
   ```
   
   Authored-by: Zhang Mingli [email protected]
   
   <!-- Thank you for your contribution to Apache Cloudberry (Incubating)! -->
   
   Fixes #ISSUE_Number
   
   ### What does this PR do?
   <!-- Brief overview of the changes, including any major features or fixes -->
   
   ### Type of Change
   - [ ] Bug fix (non-breaking change)
   - [ ] New feature (non-breaking change)
   - [ ] Breaking change (fix or feature with breaking changes)
   - [ ] Documentation update
   
   ### Breaking Changes
   <!-- Remove if not applicable. If yes, explain impact and migration path -->
   
   ### Test Plan
   <!-- How did you test these changes? -->
   - [ ] Unit tests added/updated
   - [ ] Integration tests added/updated
   - [ ] Passed `make installcheck`
   - [ ] Passed `make -C src/test installcheck-cbdb-parallel`
   
   ### Impact
   <!-- Remove sections that don't apply -->
   **Performance:**
   <!-- Any performance implications? -->
   
   **User-facing changes:**
   <!-- Any changes visible to users? -->
   
   **Dependencies:**
   <!-- New dependencies or version changes? -->
   
   ### Checklist
   - [ ] Followed [contribution 
guide](https://cloudberry.apache.org/contribute/code)
   - [ ] Added/updated documentation
   - [ ] Reviewed code for security implications
   - [ ] Requested review from [cloudberry 
committers](https://github.com/orgs/apache/teams/cloudberry-committers)
   
   ### Additional Context
   <!-- Any other information that would help reviewers? Remove if none -->
   
   ### CI Skip Instructions
   <!--
   To skip CI builds, add the appropriate CI skip identifier to your PR title.
   The identifier must:
   - Be in square brackets []
   - Include the word "ci" and either "skip" or "no"
   - Only use for documentation-only changes or when absolutely necessary
   -->
   
   ---
   <!-- Join our community:
   - Mailing list: 
[[email protected]](https://lists.apache.org/[email protected])
 (subscribe: [email protected])
   - Discussions: https://github.com/apache/cloudberry/discussions -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to