[
https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Junrui Lee updated FLINK-37161:
-------------------------------
Description:
In Flink 2.0, we support the capability of adaptive skewed join optimization
for batch jobs, which will allow the Join operator to dynamically split skewed
and splittable partitions based on runtime input statistics, thereby mitigating
the long-tail problem caused by skewed data.
We may need the following tests:
#
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
set to {{{}auto{}}}. We need to construct a simple join case with data skewed
on a single key (e.g., making the data of a specified join key N times larger
than other join keys, where N is defined by
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring
the data volume for the skewed join key exceeds the skewed-threshold (defined
by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). Finally,
observe whether the ratio of the maximum data volume to the median data volume
processed by concurrent join tasks is less than the skew factor.
#
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, but
with the following difference: the join case should be connected to a
downstream operator that performs hashing on the same field (e.g., hash
aggregation or group by). It is recommended to set different parallelisms for
the join operator and the downstream operator to prevent the out edge from
being optimized to a forward edge. Finally, observe whether the ratio of the
maximum data volume to the median data volume processed by concurrent join
tasks is less than the skew factor.
#
Test the case where {{{}table.optimizer.skewed-join-optimization.strategy{}}}as
none, and verify that the join operator will not be optimized into an adaptive
join operator under any circumstances.
#
Test the case with customized
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to
construct a skewed join instance similar to Test 1, setting different skewed
factors and observing whether the ratio of the maximum data volume to the
median data volume processed by concurrent join tasks is less than the skew
factor. Note that currently, Flink can only reduce the ratio to 2.0, and please
ensure that the skewed-factor is greater than 2.0 during testing.
#
Test the case with customized
{{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to
construct a skewed join instance similar to Test 1, setting different
skewed-threshold and observing whether the optimization is effective only when
the data volume processed by the skewed join instance is greater than the
skewed threshold.
was:
This is for cross-team verification of the release 2.0 work item: "Adaptive
skewed join optimization for batch jobs"
Please complete the following steps before Jan 31.
# Replace the description of the ticket with instructions for the testers.
** If the work item does not need cross-team verification, please comment that
explicitly and close the ticket.
# Un-assign yourself after adding the instructions.
# Either find a cross-team tester who is willing to help and assign to the
ticket, or reach out to the release manager [~xtsong].
Thanks for your contributions. For any questions, feel free to reach out to the
release manager [~xtsong].
> Cross-team verification for "Adaptive skewed join optimization for batch jobs"
> ------------------------------------------------------------------------------
>
> Key: FLINK-37161
> URL: https://issues.apache.org/jira/browse/FLINK-37161
> Project: Flink
> Issue Type: Sub-task
> Reporter: Junrui Lee
> Assignee: Lei Yang
> Priority: Blocker
> Fix For: 2.0.0
>
>
> In Flink 2.0, we support the capability of adaptive skewed join optimization
> for batch jobs, which will allow the Join operator to dynamically split
> skewed and splittable partitions based on runtime input statistics, thereby
> mitigating the long-tail problem caused by skewed data.
> We may need the following tests:
> #
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
> set to {{{}auto{}}}. We need to construct a simple join case with data skewed
> on a single key (e.g., making the data of a specified join key N times larger
> than other join keys, where N is defined by
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring
> the data volume for the skewed join key exceeds the skewed-threshold (defined
> by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}).
> Finally, observe whether the ratio of the maximum data volume to the median
> data volume processed by concurrent join tasks is less than the skew factor.
> #
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
> set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1,
> but with the following difference: the join case should be connected to a
> downstream operator that performs hashing on the same field (e.g., hash
> aggregation or group by). It is recommended to set different parallelisms for
> the join operator and the downstream operator to prevent the out edge from
> being optimized to a forward edge. Finally, observe whether the ratio of the
> maximum data volume to the median data volume processed by concurrent join
> tasks is less than the skew factor.
> #
> Test the case where
> {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify
> that the join operator will not be optimized into an adaptive join operator
> under any circumstances.
> #
> Test the case with customized
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to
> construct a skewed join instance similar to Test 1, setting different skewed
> factors and observing whether the ratio of the maximum data volume to the
> median data volume processed by concurrent join tasks is less than the skew
> factor. Note that currently, Flink can only reduce the ratio to 2.0, and
> please ensure that the skewed-factor is greater than 2.0 during testing.
> #
> Test the case with customized
> {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to
> construct a skewed join instance similar to Test 1, setting different
> skewed-threshold and observing whether the optimization is effective only
> when the data volume processed by the skewed join instance is greater than
> the skewed threshold.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)