Your investigation into bucketing is spot-on—it’s the right foundation for
eliminating the costly shuffle. The key issue is that Hive’s standard Join
(Common Join) always shuffles data based on the join key, causing huge
network I/O and sorting overhead. You saw a single partition works because
bucket map join kicked in; you need to ensure that same optimization
applies across your multi‑partition query.
Here is a structured solution plan to achieve shuffle‑free joins across
multiple partitions.
1. Ensure the Bucket Map Join (BMJ) Prerequisite
For bucket map join to auto‑convert, all joining tables must be:
Bucketed on the join key consumer_id with the same number of buckets.
Sorted on consumer_id within each bucket (SORTED BY consumer_id).
Your table definition should look like:
sql
CREATE TABLE my_table (...)
PARTITIONED BY (record_date STRING)
CLUSTERED BY (consumer_id) SORTED BY (consumer_id) INTO 100 BUCKETS
STORED AS ORC;
Why sorting matters
Sorting within each bucket allows Hive to use an efficient merge‑join
directly on the files, without any shuffle or extra sorting.
2. Activate the Critical Set of Hive Parameters
Enable both bucket map join and its advanced form (SMB) with the following
settings:
sql
set hive.auto.convert.join=true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;
set hive.auto.convert.join.noconditionaltask.size=...; -- see §5
These parameters tell Hive to prefer the bucket‑aware, shuffle‑free join
strategies whenever the table definitions allow it.
Important — The hive.optimize.bucketmapjoin.sortedmerge flag is required to
enable the “Sort‑Merge Bucket Map Join” that works across multiple
partitions, not only within a single partition.
3. Treat Partitions as Logical Buckets
You already partition by record_date and bucket by consumer_id. The two
concepts work together: each partition contains 100 bucket files. For a
join across several dates, Hive should join corresponding buckets from each
partition, not the whole dataset.
A known JIRA (HIVE‑9523) tracks making “partition map join” work as
efficiently as bucket map join, but until it is fully resolved you can
achieve the same effect by designing your query to explicitly join
bucket‑to‑bucket across partitions.
How to write the query
Instead of joining all tables upfront, force the join to be performed
within each bucket:
sql
SELECT ...
FROM table1 t1
JOIN table2 t2 ON t1.consumer_id = t2.consumer_id
JOIN table3 t3 ON t1.consumer_id = t3.consumer_id
...
WHERE t1.record_date IN ('2026-05-01', '2026-05-02', ...) -- up to 7 days
AND t2.record_date IN (...) -- same list
AND t3.record_date IN (...)
When all tables are bucketed and sorted identically, Hive will
automatically generate a plan that only touches the relevant bucket files
for each partition. This is the bucket map join across partitions that
avoids any shuffle.
Probing tip: Before running the full query, test an equal join on
consumer_id with a DISTRIBUTE BY clause to verify that no shuffle appears:
sql
EXPLAIN EXTENDED
SELECT ... FROM table1 t1 JOIN table2 t2 ON t1.consumer_id = t2.consumer_id;
Look for “Map Join” and the absence of “Shuffle” or “Reduce” stages.
4. Guarantee That All Partitions Follow the Same Bucketing Scheme
A common hidden problem is that later partitions may have a different
bucket count or missing sort order, which breaks the pre‑condition for
bucket map join. Verify with:
sql
DESCRIBE FORMATTED table1 PARTITION (record_date='2026-05-01');
Check that Num Buckets and Bucket Columns match across all participating
partitions. When writing new partitions (e.g. via INSERT OVERWRITE), ensure
you always use:
sql
INSERT OVERWRITE TABLE my_table PARTITION (record_date='...')
SELECT ...
CLUSTERED BY (consumer_id) SORTED BY (consumer_id) INTO 100 BUCKETS
Or set hive.enforce.bucketing=true; and hive.enforce.sorting=true; so that
Hive automatically respects the bucketing & sorting of the destination
table.
5. Tune Memory to Enable the Bucket Map Join
Bucket map join still loads the smaller side(s) of the join into memory.
For a multi‑table join, the “smaller” table may be quite large (several
hundred GB). Adjust the threshold to fit the largest table that can be
broadcast:
sql
set hive.auto.convert.join.noconditionaltask.size=2147483648; -- 2 GB
If any table exceeds this limit, Hive falls back to the expensive shuffle
join. Monitor the Tez AM logs for messages like “MapJoin optimization
failed because table is too big”.
Also, increase container memory to accommodate the broadcast:
sql
set hive.tez.container.size=4096; -- 4 GB, adjust to your YARN limits
set tez.am.resource.memory.mb=4096;
6. Strategy for Missing Sorted‑Bucket Support (Workaround)
If you discover that your Hive version does not support SMB joins across
multiple partitions (a known limitation), you can still avoid shuffling by
pre‑joining the data at the bucket level:
Create a “fact” table that already contains all consumer activity for the
required partitions, bucketed by consumer_id and sorted.
Pre‑compute the join for each bucket separately using a small script or a
Tez DAG that processes one bucket at a time.
Use a UNION ALL of the results from each bucket – but this is more complex
and only recommended as a last resort.
A cleaner alternative is to move the bucketing one level higher – for
example, bucket only consumer_id and do not partition by date, or partition
by a date range and bucket by consumer_id within each range. Since you have
7 TB per partition, consider keeping partitions but increasing the bucket
count (e.g., 200 buckets) to make per‑bucket data sizes smaller and more
amenable to broadcast.
7. Additional Tez‑Specific Optimizations
Beyond the bucket join settings, apply these Tez adjustments to reduce
overhead:
Enable vectorized execution (you already have it – good)
Increase parallelism for scanning partitions:
sql
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16;
Use cost‑based optimizer to choose the best join order:
sql
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
Collect table statistics on all partitions:
sql
ANALYZE TABLE my_table PARTITION (record_date) COMPUTE STATISTICS FOR
COLUMNS;
Reduce data transferred by projecting only needed columns. ORC already
supports predicate push‑down – ensure your WHERE clauses include partition
filters as early as possible.
8. When All Else Fails: Fallback to Skew Join
If after all optimisations you still see shuffle because of uneven
consumer_id distribution (some keys huge, others tiny), activate skew join
handling:
sql
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000; -- treat a key as skewed if it
exceeds 100k rows
This will split the skewed keys into separate tasks and reduce the severity
of the shuffle.
Summary of Immediate Actions
Verify that every table is defined with CLUSTERED BY (consumer_id) SORTED
BY (consumer_id) INTO 100 BUCKETS.
Set the four bucket parameters (Section 2) in your session or globally.
Re‑write your query to include partition filters for all tables and ensure
the join condition is only consumer_id = consumer_id.
Increase the broadcast threshold to the size of your largest table (Section
5).
Test on a subset of 2–3 partitions first, using EXPLAIN EXTENDED to confirm
no shuffle appears.
With these changes, your 7‑table, 7‑partition join should move from 20–30
minutes of shuffling down to a few minutes of efficient, merge‑based bucket
joins.
On Mon, 26 Aug 2019 at 19:01, Soupam Mandal <[email protected]> wrote:
> 0
>
> We have 7 tables and each table is partitioned by record_date.There is a
> query which involves inner join with all these tables and join is based on
> consumer_id. The join involves multiple partition join. Currently querying
> 1 week data takes very long time around 20-30 mins. We want to optimize
> this query. The root cause of this slowness is the data shuffling and map
> reduce.Each table contains around 8TB data on compression. Table is
> compressed with ORC-Zlib.
>
> We tried bucketing along with the partitioning. So we bucketed each
> partition of the table into 100 buckets based on the consumer_id as
> consumer_id is the joining key. When there is a query involving one
> partition there is improvement in the query perf as it uses merge join. But
> whenever there is a query involving multiple partitions the query plan
> starts shuffling data and no improvements in query perf. We are using hive
> tez engine and vectorization is enabled.Can anyone suggest what approach we
> should follow when there is query with multiple tables join and multiple
> partitions?
>