[ 
https://issues.apache.org/jira/browse/HBASE-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090751#comment-18090751
 ] 

JeongMin Ju commented on HBASE-30238:
-------------------------------------

Thanks for checking this, and sorry for the confusion.

Yes, HBASE-30237 was originally created by me. I created it through the Apache 
Jira API using a token that I had generated for my account, but it was 
unexpectedly recorded with a different reporter account. I could not determine 
why that happened, and I also could not correct the reporter on that issue 
myself.

Because of that, I created HBASE-30238 with the same content under my account 
and moved the PR to reference HBASE-30238 instead.

So closing HBASE-30237 and keeping HBASE-30238 is the right outcome. Sorry 
again for the duplicated issue.

> HBase bulkload replication causes duplicate HFile loading and compaction 
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-30238
>                 URL: https://issues.apache.org/jira/browse/HBASE-30238
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 2.5.15, 2.6.6
>            Reporter: JeongMin Ju
>            Assignee: JeongMin Ju
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies 
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If 
> this operation takes longer than {{replication.source.shipedits.timeout}} 
> (default 60s), the source retries the same WAL entry. Since 
> {{HFileReplicator}} creates a new random staging directory on every 
> invocation, the sink can process the same bulkload event multiple times -- 
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or 
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction 
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is 
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once delivery 
> semantics, but {{ReplicationSink}} historically had no event tracking 
> mechanism to prevent concurrent or completed reprocessing of the same 
> bulkload WAL event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was 
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls 
> on the sink -- more invocations than the source fired -- confirming multiple 
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours 
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to 
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
>   -> HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
> shipEditsTimeout]
>     -> ReplicationSink.replicateEntries()
>       -> HFileReplicator.replicate()
>           copyHFilesToStagingDir()      <- copies HFiles from source HDFS 
> (slow)
>           LoadIncrementalHFiles.load()  <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying 
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with 
> the same WAL entry. {{HFileReplicator}} creates a new random staging 
> directory each time, so there is no HFile-level detection of an in-progress 
> or already-completed invocation.
> Retries may also be sent to a different target RegionServer. Therefore, a 
> per-{{ReplicationSink}} in-memory guard is not sufficient by itself: the 
> event state must be visible across all RegionServers in the sink cluster.
> Additionally, there was no rate-limiting mechanism for the HFile copy 
> bandwidth, meaning a single large bulkload can saturate the sink cluster's 
> network and cause cascading degradation.
> h2. Fix
> This patch addresses the failure in three layers:
> *1. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s, 
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy 
> threads within a single {{HFileReplicator}} invocation, so the configured 
> bandwidth is a per-RS ceiling regardless of 
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit 
> can be updated dynamically via configuration reload without restarting 
> RegionServers.
> *2. Distributed bulkload event tracking across sink RegionServers*
> {{ReplicationSink}} now coordinates replicated bulkload WAL events through 
> ZooKeeper. The RegionServer service path passes its {{ZKWatcher}} into 
> {{ReplicationSink}}, which creates a {{ReplicationBulkLoadEventTracker}}.
> Each bulkload event is identified by:
> {code}
> replicationClusterId
> table
> encodedRegionName
> bulkloadSeqNum
> {code}
> The event is stored under a bucket derived from the WAL write time, using two 
> ZooKeeper marker trees:
> {code}
> .../replication/bulkload-events/in-progress/<bucket>/<eventId>
> .../replication/bulkload-events/done/<bucket>/<eventId>
> {code}
> The {{in-progress}} marker is ephemeral and represents the RegionServer 
> currently processing the event. The {{done}} marker is persistent and records 
> successful completion.
> When a sink receives a bulkload WAL event:
> # If a {{done}} marker already exists, the event is skipped.
> # Otherwise, the sink attempts to create the {{in-progress}} marker.
> # If another RegionServer is already processing the same event, the sink 
> waits for either completion or timeout.
> # After {{HFileReplicator.replicate()}} succeeds, the sink writes the 
> {{done}} marker and releases the {{in-progress}} marker.
> This prevents duplicate HFile loading when a source-side RPC retry is routed 
> to a different target RegionServer after the first target RegionServer 
> already completed the same bulkload event.
> A local in-memory guard remains as a fallback for {{ReplicationSink}} 
> instances constructed without ZooKeeper, but the RegionServer replication 
> service path uses ZooKeeper-backed event tracking.
> *3. Master-side cleanup for completed event markers*
> Adds {{ReplicationBulkLoadEventCleaner}}, a master scheduled chore. It 
> removes expired {{done}} markers only when the matching {{in-progress}} 
> marker no longer exists, then removes empty bucket znodes.
> The cleanup is controlled by:
> {code}
> hbase.master.cleaner.replication.bulkload.event.period.ms
> hbase.replication.bulkload.event.done.ttl.ms
> {code}
> The default completed-marker TTL is one day.
> h2. Test Coverage
> The patch adds coverage for both the throttling and deduplication paths:
> * {{TestHFileReplicatorBandwidth}} verifies bandwidth throttling and dynamic 
> rate-limit updates.
> * {{TestReplicationSinkBulkLoadDedup}} verifies sink-level duplicate handling 
> and completed-marker skipping.
> * {{TestReplicationBulkLoadEventTracker}} verifies ZooKeeper in-progress/done 
> marker behavior and stable event bucket/id generation.
> * {{TestReplicationBulkLoadEventCleaner}} verifies master-side cleanup of 
> expired {{done}} markers while preserving markers with matching 
> {{in-progress}} state.
> * 
> {{TestBulkLoadReplication#testDuplicateBulkLoadWalEventSkippedAcrossTargetRegionServers}}
>  starts MiniCluster RegionServers and replays the exact same bulkload WAL 
> batch first through one target RS sink and then through another target RS 
> sink, verifying that the second replay is skipped by the completed marker.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or 
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B may process the same bulkload WAL entry multiple 
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique replicated bulkload WAL event should load its HFiles at most once 
> in the sink cluster after successful completion, even when the source retries 
> due to RPC timeout and the retry is routed to another target RegionServer.
> Failed attempts remain retryable; successful attempts are recorded with 
> completed markers until cleanup TTL expires.
> h2. Actual Behavior
> The same bulkload WAL entry can be processed multiple times, creating 
> duplicate store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability 
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The bandwidth throttling and ZooKeeper event tracking fixes are 
> complementary: throttling reduces the chance of network saturation and source 
> RPC timeout, while event tracking prevents duplicate bulkload execution when 
> timeout/retry still occurs.
> * Pull request: https://github.com/apache/hbase/pull/8380



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to