[ https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xingcan Cui updated FLINK-34926: -------------------------------- Attachment: image.png > Adaptive auto parallelism doesn't work for a query > -------------------------------------------------- > > Key: FLINK-34926 > URL: https://issues.apache.org/jira/browse/FLINK-34926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.18.1 > Reporter: Xingcan Cui > Priority: Major > Attachments: image.png > > > We have the following query running in batch mode. > {code:java} > WITH FEATURE_INCLUSION AS ( > SELECT > insertion_id, -- Not unique > features -- Array<Row<key, value>> > FROM > features_table > ), > TOTAL AS ( > SELECT > COUNT(DISTINCT insertion_id) total_id > FROM > FEATURE_INCLUSION > ), > FEATURE_INCLUSION_COUNTS AS ( > SELECT > `key`, > COUNT(DISTINCT insertion_id) AS id_count > FROM > FEATURE_INCLUSION, > UNNEST(features) as t (`key`, `value`) > WHERE > TRUE > GROUP BY > `key` > ), > RESULTS AS ( > SELECT > `key` > FROM > FEATURE_INCLUSION_COUNTS, > TOTAL > WHERE > (1.0 * id_count)/total_id > 0.1 > ) > SELECT > JSON_ARRAYAGG(`key`) AS feature_ids, > FROM > RESULTS{code} > The parallelism adaptively set by Flink for the following operator was always > 1. > {code:java} > [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, > insertion_id]) > +- [38]:LocalHashAggregate(groupBy=[key], select=[key, > Partial_COUNT(insertion_id) AS count$0]){code} > If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and > manually set `parallelism.default` to be greater than one, it worked. > The screenshot of the full job graph is attached. !image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)