HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b Branch: refs/heads/HBASE-20046-branch-2 Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d Parents: cea5199 Author: zhangduo <zhang...@apache.org> Authored: Mon Mar 12 12:21:44 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 9 15:18:44 2018 +0800 ---------------------------------------------------------------------- .../RecoveredReplicationSource.java | 67 +++++------ .../RecoveredReplicationSourceShipper.java | 48 ++------ .../RecoveredReplicationSourceWALReader.java | 56 ---------- .../regionserver/ReplicationSource.java | 36 +++--- .../regionserver/ReplicationSourceShipper.java | 27 +++-- .../ReplicationSourceWALReader.java | 101 +++++------------ .../SerialReplicationSourceWALReader.java | 112 +++++++++++++++++++ 7 files changed, 218 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d9506c0..169b469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { - final RecoveredReplicationSourceShipper worker = - new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, - this.queueStorage); - ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); - if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); - } else { - LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(this::uncaughtException); - worker.setWALReader( - startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); - } + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue<Path> queue) { + return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + } + + private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, + BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException { + LOG.trace("Didn't read any new entries from WAL"); + // we're done with queue recovery, shut ourself down + reader.setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(new WALEntryBatch(0, currentPath)); } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { - ReplicationSourceWALReader walReader = - new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); - Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - this::uncaughtException); - return walReader; + if (replicationPeer.getPeerConfig().isSerial()) { + return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, + this) { + + @Override + protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); + } + }; + } else { + return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) { + + @Override + protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); + } + }; + } } public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { @@ -166,21 +176,14 @@ public class RecoveredReplicationSource extends ReplicationSource { return path; } - public void tryFinish() { + void tryFinish() { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit - boolean allTasksDone = true; - for (ReplicationSourceShipper worker : workerThreads.values()) { - if (!worker.isFinished()) { - allTasksDone = false; - break; - } - } + boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished()); if (allTasksDone) { manager.removeRecoveredSource(this); - LOG.info("Finished recovering queue " + queueId + " with the following stats: " - + getStats()); + LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats()); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 9c36497..1ae5cb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -48,46 +48,18 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - public void run() { - setWorkerState(WorkerState.RUNNING); - // Loop until we close down - while (isActive()) { - int sleepMultiplier = 1; - // Sleep until replication is enabled again - if (!source.isPeerEnabled()) { - if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - - while (entryReader == null) { - if (source.sleepForRetries("Replication WAL entry reader thread not initialized", - sleepMultiplier)) { - sleepMultiplier++; - } - } - - try { - WALEntryBatch entryBatch = entryReader.take(); - shipEdits(entryBatch); - if (entryBatch.getWalEntries().isEmpty()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getQueueId()); - source.getSourceMetrics().incrCompletedRecoveryQueue(); - setWorkerState(WorkerState.FINISHED); - continue; - } - } catch (InterruptedException e) { - LOG.trace("Interrupted while waiting for next replication entry batch", e); - Thread.currentThread().interrupt(); - } + protected void postShipEdits(WALEntryBatch entryBatch) { + if (entryBatch.getWalEntries().isEmpty()) { + LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + + source.getQueueId()); + source.getSourceMetrics().incrCompletedRecoveryQueue(); + setWorkerState(WorkerState.FINISHED); } + } + + @Override + protected void postFinish() { source.tryFinish(); - // If the worker exits run loop without finishing its task, mark it as stopped. - if (!isFinished()) { - setWorkerState(WorkerState.STOPPED); - } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java deleted file mode 100644 index 114f139..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import java.util.concurrent.PriorityBlockingQueue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.replication.WALEntryFilter; - -/** - * Used by a {@link RecoveredReplicationSourceShipper}. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader { - - private static final Logger LOG = - LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class); - - public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, - ReplicationSource source) { - super(fs, conf, logQueue, startPosition, filter, source); - } - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - LOG.trace("Didn't read any new entries from WAL"); - // we're done with queue recovery, shut ourself down - setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath)); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f5e4185..3480919 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -85,7 +85,7 @@ public class ReplicationSource implements ReplicationSourceInterface { // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; - private ReplicationPeer replicationPeer; + protected ReplicationPeer replicationPeer; protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; @@ -294,26 +294,32 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { - ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this); + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { + ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); + LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId); } else { - LOG.debug("Starting up worker for wal group " + walGroupId); + LOG.debug("Starting up worker for wal group {}", walGroupId); + ReplicationSourceWALReader walReader = + createNewWALReader(walGroupId, queue, worker.getStartPosition()); + Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); + worker.setWALReader(walReader); worker.startup(this::uncaughtException); - worker.setWALReader( - startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); } } - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue<Path> queue) { + return new ReplicationSourceShipper(conf, walGroupId, queue, this); + } + + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { - ReplicationSourceWALReader walReader = - new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); - return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - this::uncaughtException); + return replicationPeer.getPeerConfig().isSerial() + ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) + : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); } protected final void uncaughtException(Thread t, Throwable e) { @@ -392,10 +398,6 @@ public class ReplicationSource implements ReplicationSourceInterface { return replicationPeer.isPeerEnabled(); } - public boolean isSerial() { - return replicationPeer.getPeerConfig().isSerial(); - } - private void initialize() { int sleepMultiplier = 1; while (this.isSourceActive()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 50aaf95..aa5251e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -83,7 +83,7 @@ public class ReplicationSourceShipper extends Thread { } @Override - public void run() { + public final void run() { setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { @@ -95,28 +95,31 @@ public class ReplicationSourceShipper extends Thread { } continue; } - - while (entryReader == null) { - if (sleepForRetries("Replication WAL entry reader thread not initialized", - sleepMultiplier)) { - sleepMultiplier++; - } - } - try { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); + postShipEdits(entryBatch); } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); Thread.currentThread().interrupt(); } } // If the worker exits run loop without finishing its task, mark it as stopped. - if (state != WorkerState.FINISHED) { + if (!isFinished()) { setWorkerState(WorkerState.STOPPED); + } else { + postFinish(); } } + // To be implemented by recovered shipper + protected void postShipEdits(WALEntryBatch entryBatch) { + } + + // To be implemented by recovered shipper + protected void postFinish() { + } + /** * Do the shipping logic */ @@ -229,8 +232,8 @@ public class ReplicationSourceShipper extends Thread { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); - Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," - + source.getQueueId(), handler); + Threads.setDaemonThreadRunning(this, + name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } public PriorityBlockingQueue<Path> getLogQueue() { http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index da92a09..b125133 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -51,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ReplicationSourceWALReader extends Thread { +class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); private final PriorityBlockingQueue<Path> logQueue; @@ -64,28 +63,19 @@ public class ReplicationSourceWALReader extends Thread { // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total - protected final int replicationBatchCountCapacity; + private final int replicationBatchCountCapacity; // position in the WAL to start reading at private long currentPosition; private final long sleepForRetries; private final int maxRetriesMultiplier; private final boolean eofAutoRecovery; - // used to store the first cell in an entry before filtering. This is because that if serial - // replication is enabled, we may find out that an entry can not be pushed after filtering. And - // when we try the next time, the cells maybe null since the entry has already been filtered, - // especially for region event wal entries. And this can also used to determine whether we can - // skip filtering. - private Cell firstCellInEntryBeforeFiltering; - //Indicates whether this particular worker is running private boolean isReaderRunning = true; private AtomicLong totalBufferUsed; private long totalBufferQuota; - private final SerialReplicationChecker serialReplicationChecker; - /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -120,7 +110,6 @@ public class ReplicationSourceWALReader extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); - this.serialReplicationChecker = new SerialReplicationChecker(conf, source); LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity @@ -169,75 +158,35 @@ public class ReplicationSourceWALReader extends Thread { } } - private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) - throws IOException { - entryStream.next(); - firstCellInEntryBeforeFiltering = null; - batch.setLastWalPosition(entryStream.getPosition()); + // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. + protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { + WALEdit edit = entry.getEdit(); + if (edit == null || edit.isEmpty()) { + return false; + } + long entrySize = getEntrySize(entry); + batch.addEntry(entry); + updateBatchStats(batch, entry, entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + // Stop if too many entries or too big + return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || + batch.getNbEntries() >= replicationBatchCountCapacity; } - private WALEntryBatch readWALEntries(WALEntryStream entryStream) + protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { if (!entryStream.hasNext()) { return null; } - long positionBefore = entryStream.getPosition(); - WALEntryBatch batch = - new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + WALEntryBatch batch = createBatch(entryStream); do { - Entry entry = entryStream.peek(); - boolean isSerial = source.isSerial(); - boolean doFiltering = true; - if (isSerial) { - if (firstCellInEntryBeforeFiltering == null) { - assert !entry.getEdit().isEmpty() : "should not write empty edits"; - // Used to locate the region record in meta table. In WAL we only have the table name and - // encoded region name which can not be mapping to region name without scanning all the - // records for a table, so we need a start key, just like what we have done at client side - // when locating a region. For the markers, we will use the start key of the region as the - // row key for the edit. And we need to do this before filtering since all the cells may - // be filtered out, especially that for the markers. - firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0); - } else { - // if this is not null then we know that the entry has already been filtered. - doFiltering = false; - } - } - - if (doFiltering) { - entry = filterEntry(entry); - } + Entry entry = entryStream.next(); + batch.setLastWalPosition(entryStream.getPosition()); + entry = filterEntry(entry); if (entry != null) { - if (isSerial) { - if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) { - if (batch.getLastWalPosition() > positionBefore) { - // we have something that can push, break - break; - } else { - serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering); - } - } - // arrive here means we can push the entry, record the last sequence id - batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), - entry.getKey().getSequenceId()); + if (addEntryToBatch(batch, entry)) { + break; } - // actually remove the entry. - removeEntryFromStream(entryStream, batch); - WALEdit edit = entry.getEdit(); - if (edit != null && !edit.isEmpty()) { - long entrySize = getEntrySize(entry); - batch.addEntry(entry); - updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); - // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || - batch.getNbEntries() >= replicationBatchCountCapacity) { - break; - } - } - } else { - // actually remove the entry. - removeEntryFromStream(entryStream, batch); } } while (entryStream.hasNext()); return batch; @@ -286,7 +235,11 @@ public class ReplicationSourceWALReader extends Thread { return true; } - private Entry filterEntry(Entry entry) { + protected final WALEntryBatch createBatch(WALEntryStream entryStream) { + return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + } + + protected final Entry filterEntry(Entry entry) { Entry filtered = filter.filter(entry); if (entry != null && filtered == null) { source.getSourceMetrics().incrLogEditsFiltered(); http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java new file mode 100644 index 0000000..5e9a9f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * WAL reader for a serial replication peer. + */ +@InterfaceAudience.Private +public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader { + + // used to store the first cell in an entry before filtering. This is because that if serial + // replication is enabled, we may find out that an entry can not be pushed after filtering. And + // when we try the next time, the cells maybe null since the entry has already been filtered, + // especially for region event wal entries. And this can also used to determine whether we can + // skip filtering. + private Cell firstCellInEntryBeforeFiltering; + + private final SerialReplicationChecker checker; + + public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, + PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, + ReplicationSource source) { + super(fs, conf, logQueue, startPosition, filter, source); + checker = new SerialReplicationChecker(conf, source); + } + + @Override + protected WALEntryBatch readWALEntries(WALEntryStream entryStream) + throws IOException, InterruptedException { + if (!entryStream.hasNext()) { + return null; + } + long positionBefore = entryStream.getPosition(); + WALEntryBatch batch = createBatch(entryStream); + do { + Entry entry = entryStream.peek(); + boolean doFiltering = true; + if (firstCellInEntryBeforeFiltering == null) { + assert !entry.getEdit().isEmpty() : "should not write empty edits"; + // Used to locate the region record in meta table. In WAL we only have the table name and + // encoded region name which can not be mapping to region name without scanning all the + // records for a table, so we need a start key, just like what we have done at client side + // when locating a region. For the markers, we will use the start key of the region as the + // row key for the edit. And we need to do this before filtering since all the cells may + // be filtered out, especially that for the markers. + firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0); + } else { + // if this is not null then we know that the entry has already been filtered. + doFiltering = false; + } + + if (doFiltering) { + entry = filterEntry(entry); + } + if (entry != null) { + if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { + if (batch.getLastWalPosition() > positionBefore) { + // we have something that can push, break + break; + } else { + checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering); + } + } + // arrive here means we can push the entry, record the last sequence id + batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), + entry.getKey().getSequenceId()); + // actually remove the entry. + removeEntryFromStream(entryStream, batch); + if (addEntryToBatch(batch, entry)) { + break; + } + } else { + // actually remove the entry. + removeEntryFromStream(entryStream, batch); + } + } while (entryStream.hasNext()); + return batch; + } + + private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) + throws IOException { + entryStream.next(); + firstCellInEntryBeforeFiltering = null; + batch.setLastWalPosition(entryStream.getPosition()); + } +}