HBASE-11568 Async WAL replication for region replicas
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e28ec724 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e28ec724 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e28ec724 Branch: refs/heads/master Commit: e28ec72464ccb266c49e5ef42dd87f938e71ffde Parents: d44e7df Author: Enis Soztutar <e...@apache.org> Authored: Tue Aug 19 18:59:22 2014 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Aug 19 18:59:22 2014 -0700 ---------------------------------------------------------------------- .../client/RegionAdminServiceCallable.java | 131 +++++ .../hadoop/hbase/client/RpcRetryingCaller.java | 2 +- .../src/main/resources/hbase-default.xml | 14 + .../master/handler/CreateTableHandler.java | 13 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 19 +- .../hbase/regionserver/RSRpcServices.java | 23 +- .../hbase/regionserver/wal/HLogSplitter.java | 197 ++++--- .../RegionReplicaReplicationEndpoint.java | 558 +++++++++++++++++++ .../regionserver/ReplicationSource.java | 3 +- .../hbase/util/ServerRegionReplicaUtil.java | 51 ++ .../hadoop/hbase/HBaseTestingUtility.java | 35 +- .../hbase/regionserver/TestRegionReplicas.java | 81 +-- .../regionserver/TestRegionServerNoMaster.java | 68 +-- .../hbase/regionserver/wal/TestHLogMethods.java | 27 +- .../TestRegionReplicaReplicationEndpoint.java | 345 ++++++++++++ ...egionReplicaReplicationEndpointNoMaster.java | 264 +++++++++ 16 files changed, 1637 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java new file mode 100644 index 0000000..d114304 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; + +/** + * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable + * assumes a Table and row and thus does region locating similar to RegionServerCallable. + */ +public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { + + protected final ClusterConnection connection; + + protected AdminService.BlockingInterface stub; + + protected HRegionLocation location; + + protected final TableName tableName; + protected final byte[] row; + + protected final static int MIN_WAIT_DEAD_SERVER = 10000; + + public RegionAdminServiceCallable(ClusterConnection connection, TableName tableName, byte[] row) { + this(connection, null, tableName, row); + } + + public RegionAdminServiceCallable(ClusterConnection connection, HRegionLocation location, + TableName tableName, byte[] row) { + this.connection = connection; + this.location = location; + this.tableName = tableName; + this.row = row; + } + + @Override + public void prepare(boolean reload) throws IOException { + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + if (reload || location == null) { + location = getLocation(!reload); + } + + if (location == null) { + // With this exception, there will be a retry. + throw new HBaseIOException(getExceptionMessage()); + } + + this.setStub(connection.getAdmin(location.getServerName())); + } + + protected void setStub(AdminService.BlockingInterface stub) { + this.stub = stub; + } + + public abstract HRegionLocation getLocation(boolean useCache) throws IOException; + + @Override + public void throwable(Throwable t, boolean retrying) { + if (t instanceof SocketTimeoutException || + t instanceof ConnectException || + t instanceof RetriesExhaustedException || + (location != null && getConnection().isDeadServer(location.getServerName()))) { + // if thrown these exceptions, we clear all the cache entries that + // map to that slow/dead server; otherwise, let cache miss and ask + // hbase:meta again to find the new location + if (this.location != null) getConnection().clearCaches(location.getServerName()); + } else if (t instanceof RegionMovedException) { + getConnection().updateCachedLocations(tableName, row, t, location); + } else if (t instanceof NotServingRegionException) { + // Purge cache entries for this specific region from hbase:meta cache + // since we don't call connect(true) when number of retries is 1. + getConnection().deleteCachedRegionLocation(location); + } + } + + /** + * @return {@link HConnection} instance used by this Callable. + */ + HConnection getConnection() { + return this.connection; + } + + //subclasses can override this. + protected String getExceptionMessage() { + return "There is no location"; + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return null; + } + + @Override + public long sleep(long pause, int tries) { + long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); + if (sleep < MIN_WAIT_DEAD_SERVER + && (location == null || connection.isDeadServer(location.getServerName()))) { + sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); + } + return sleep; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 286b6fe..9e11a27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -48,7 +48,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private public class RpcRetryingCaller<T> { - static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); + public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); /** * When we started making calls. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 959c4e1..30d50df 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1390,6 +1390,20 @@ possible configurations would overwhelm and obscure the important. </description> </property> <property> + <name>hbase.region.replica.replication.enabled</name> + <value>false</value> + <description> + Whether asynchronous WAL replication to the secondary region replicas is enabled or not. + If this is enabled, a replication peer named "region_replica_replication" will be created + which will tail the logs and replicate the mutatations to region replicas for tables that + have region replication > 1. If this is enabled once, disabling this replication also + requires disabling the replication peer using shell or ReplicationAdmin java class. + Replication to secondary region replicas works over standard inter-cluster replication. + So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication" + to true for this feature to work. + </description> + </property> + <property> <name>hbase.http.filter.initializers</name> <value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value> <description> http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index fa34ba1..4c20dc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; /** * Handler to create a table. @@ -196,9 +197,8 @@ public class CreateTableHandler extends EventHandler { */ protected void completed(final Throwable exception) { releaseTableLock(); - String msg = exception == null ? null : exception.getMessage(); LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " + - msg == null ? "successful" : "failed. " + msg); + (exception == null ? "successful" : "failed. " + exception)); if (exception != null) { removeEnablingTable(this.assignmentManager, this.hTableDescriptor.getTableName()); } @@ -243,11 +243,16 @@ public class CreateTableHandler extends EventHandler { // 5. Add replicas if needed regionInfos = addReplicas(hTableDescriptor, regionInfos); - // 6. Trigger immediate assignment of the regions in round-robin fashion + // 6. Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(conf); + } + + // 7. Trigger immediate assignment of the regions in round-robin fashion ModifyRegionUtils.assignRegions(assignmentManager, regionInfos); } - // 6. Set table enabled flag up in zk. + // 8. Set table enabled flag up in zk. try { assignmentManager.getTableStateManager().setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index fdf4bb3..a50978b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -59,7 +59,7 @@ public class ReplicationProtbufUtil { public static void replicateWALEntry(final AdminService.BlockingInterface admin, final HLog.Entry[] entries) throws IOException { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = - buildReplicateWALEntryRequest(entries); + buildReplicateWALEntryRequest(entries, null); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); try { admin.replicateWALEntry(controller, p.getFirst()); @@ -77,6 +77,19 @@ public class ReplicationProtbufUtil { */ public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(final HLog.Entry[] entries) { + return buildReplicateWALEntryRequest(entries, null); + } + + /** + * Create a new ReplicateWALEntryRequest from a list of HLog entries + * + * @param entries the HLog entries to be replicated + * @param encodedRegionName alternative region name to use if not null + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. + */ + public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> + buildReplicateWALEntryRequest(final HLog.Entry[] entries, byte[] encodedRegionName) { // Accumulate all the KVs seen in here. List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length); int size = 0; @@ -91,7 +104,9 @@ public class ReplicationProtbufUtil { WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); HLogKey key = entry.getKey(); keyBuilder.setEncodedRegionName( - ByteStringer.wrap(key.getEncodedRegionName())); + ByteStringer.wrap(encodedRegionName == null + ? key.getEncodedRegionName() + : encodedRegionName)); keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName())); keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); keyBuilder.setWriteTime(key.getWriteTime()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 10da06d..0f89a8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; @@ -156,6 +157,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; @@ -1358,13 +1360,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // empty input return ReplicateWALEntryResponse.newBuilder().build(); } - HRegion region = regionServer.getRegionByEncodedName( - entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); + RegionCoprocessorHost coprocessorHost = + ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) + ? region.getCoprocessorHost() + : null; // do not invoke coprocessors if this is a secondary region replica List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); + + // Skip adding the edits to WAL if this is a secondary region replica + boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); + Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; + for (WALEntry entry : entries) { + if (!regionName.equals(entry.getKey().getEncodedRegionName())) { + throw new NotServingRegionException("Replay request contains entries from multiple " + + "regions. First region:" + regionName.toStringUtf8() + " , other region:" + + entry.getKey().getEncodedRegionName()); + } if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; @@ -1374,7 +1389,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, - cells, walEntry, needAddReplayTag); + cells, walEntry, needAddReplayTag, durability); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 815730c..2df9f50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -133,6 +133,7 @@ public class HLogSplitter { // Major subcomponents of the split process. // These are separated into inner classes to make testing easier. + PipelineController controller; OutputSink outputSink; EntryBuffers entryBuffers; @@ -141,14 +142,6 @@ public class HLogSplitter { private ZooKeeperWatcher watcher; private CoordinatedStateManager csm; - // If an exception is thrown by one of the other threads, it will be - // stored here. - protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); - - // Wait/notify for when data has been produced by the reader thread, - // consumed by the reader thread, or an exception occurred - final Object dataAvailable = new Object(); - private MonitoredTask status; // For checking the latest flushed sequence id @@ -184,8 +177,9 @@ public class HLogSplitter { this.sequenceIdChecker = idChecker; this.watcher = zkw; this.csm = csm; + this.controller = new PipelineController(); - entryBuffers = new EntryBuffers( + entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); @@ -196,13 +190,13 @@ public class HLogSplitter { this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (zkw != null && csm != null && this.distributedLogReplay) { - outputSink = new LogReplayOutputSink(numWriterThreads); + outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; - outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); + outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } } @@ -636,22 +630,6 @@ public class HLogSplitter { } } - private void writerThreadError(Throwable t) { - thrown.compareAndSet(null, t); - } - - /** - * Check for errors in the writer threads. If any is found, rethrow it. - */ - private void checkForErrors() throws IOException { - Throwable thrown = this.thrown.get(); - if (thrown == null) return; - if (thrown instanceof IOException) { - throw new IOException(thrown); - } else { - throw new RuntimeException(thrown); - } - } /** * Create a new {@link Writer} for writing log splits. */ @@ -680,13 +658,45 @@ public class HLogSplitter { } /** + * Contains some methods to control WAL-entries producer / consumer interactions + */ + public static class PipelineController { + // If an exception is thrown by one of the other threads, it will be + // stored here. + AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); + + // Wait/notify for when data has been produced by the writer thread, + // consumed by the reader thread, or an exception occurred + public final Object dataAvailable = new Object(); + + void writerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw new IOException(thrown); + } else { + throw new RuntimeException(thrown); + } + } + } + + /** * Class which accumulates edits and separates them into a buffer per region * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses * a predefined threshold. * * Writer threads then pull region-specific buffers from this class. */ - class EntryBuffers { + public static class EntryBuffers { + PipelineController controller; + Map<byte[], RegionEntryBuffer> buffers = new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR); @@ -698,7 +708,8 @@ public class HLogSplitter { long totalBuffered = 0; long maxHeapUsage; - EntryBuffers(long maxHeapUsage) { + public EntryBuffers(PipelineController controller, long maxHeapUsage) { + this.controller = controller; this.maxHeapUsage = maxHeapUsage; } @@ -709,7 +720,7 @@ public class HLogSplitter { * @throws InterruptedException * @throws IOException */ - void appendEntry(Entry entry) throws InterruptedException, IOException { + public void appendEntry(Entry entry) throws InterruptedException, IOException { HLogKey key = entry.getKey(); RegionEntryBuffer buffer; @@ -724,15 +735,15 @@ public class HLogSplitter { } // If we crossed the chunk threshold, wait for more space to be available - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { totalBuffered += incrHeap; - while (totalBuffered > maxHeapUsage && thrown.get() == null) { + while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); - dataAvailable.wait(2000); + controller.dataAvailable.wait(2000); } - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } - checkForErrors(); + controller.checkForErrors(); } /** @@ -765,16 +776,30 @@ public class HLogSplitter { } long size = buffer.heapSize(); - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { totalBuffered -= size; // We may unblock writers - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } } synchronized boolean isRegionCurrentlyWriting(byte[] region) { return currentlyWriting.contains(region); } + + public void waitUntilDrained() { + synchronized (controller.dataAvailable) { + while (totalBuffered > 0) { + try { + controller.dataAvailable.wait(2000); + } catch (InterruptedException e) { + LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained"); + Thread.interrupted(); + break; + } + } + } + } } /** @@ -783,7 +808,7 @@ public class HLogSplitter { * share a single byte array instance for the table and region name. * Also tracks memory usage of the accumulated edits. */ - static class RegionEntryBuffer implements HeapSize { + public static class RegionEntryBuffer implements HeapSize { long heapInBuffer = 0; List<Entry> entryBuffer; TableName tableName; @@ -815,14 +840,30 @@ public class HLogSplitter { public long heapSize() { return heapInBuffer; } + + public byte[] getEncodedRegionName() { + return encodedRegionName; + } + + public List<Entry> getEntryBuffer() { + return entryBuffer; + } + + public TableName getTableName() { + return tableName; + } } - class WriterThread extends Thread { + public static class WriterThread extends Thread { private volatile boolean shouldStop = false; + private PipelineController controller; + private EntryBuffers entryBuffers; private OutputSink outputSink = null; - WriterThread(OutputSink sink, int i) { + WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){ super(Thread.currentThread().getName() + "-Writer-" + i); + this.controller = controller; + this.entryBuffers = entryBuffers; outputSink = sink; } @@ -832,7 +873,7 @@ public class HLogSplitter { doRun(); } catch (Throwable t) { LOG.error("Exiting thread", t); - writerThreadError(t); + controller.writerThreadError(t); } } @@ -842,12 +883,12 @@ public class HLogSplitter { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { // No data currently available, wait on some more to show up - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { if (shouldStop && !this.outputSink.flush()) { return; } try { - dataAvailable.wait(500); + controller.dataAvailable.wait(500); } catch (InterruptedException ie) { if (!shouldStop) { throw new RuntimeException(ie); @@ -871,9 +912,9 @@ public class HLogSplitter { } void finish() { - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { shouldStop = true; - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } } } @@ -882,7 +923,10 @@ public class HLogSplitter { * The following class is an abstraction class to provide a common interface to support both * existing recovered edits file sink and region server WAL edits replay sink */ - abstract class OutputSink { + public static abstract class OutputSink { + + protected PipelineController controller; + protected EntryBuffers entryBuffers; protected Map<byte[], SinkWriter> writers = Collections .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));; @@ -908,8 +952,10 @@ public class HLogSplitter { protected List<Path> splits = null; - public OutputSink(int numWriters) { + public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { numThreads = numWriters; + this.controller = controller; + this.entryBuffers = entryBuffers; } void setReporter(CancelableProgressable reporter) { @@ -919,9 +965,9 @@ public class HLogSplitter { /** * Start the threads that will pump data from the entryBuffers to the output files. */ - synchronized void startWriterThreads() { + public synchronized void startWriterThreads() { for (int i = 0; i < numThreads; i++) { - WriterThread t = new WriterThread(this, i); + WriterThread t = new WriterThread(controller, entryBuffers, this, i); t.start(); writerThreads.add(t); } @@ -980,34 +1026,34 @@ public class HLogSplitter { throw iie; } } - checkForErrors(); + controller.checkForErrors(); LOG.info("Split writers finished"); return (!progress_failed); } - abstract List<Path> finishWritingAndClose() throws IOException; + public abstract List<Path> finishWritingAndClose() throws IOException; /** * @return a map from encoded region ID to the number of edits written out for that region. */ - abstract Map<byte[], Long> getOutputCounts(); + public abstract Map<byte[], Long> getOutputCounts(); /** * @return number of regions we've recovered */ - abstract int getNumberOfRecoveredRegions(); + public abstract int getNumberOfRecoveredRegions(); /** * @param buffer A WAL Edit Entry * @throws IOException */ - abstract void append(RegionEntryBuffer buffer) throws IOException; + public abstract void append(RegionEntryBuffer buffer) throws IOException; /** * WriterThread call this function to help flush internal remaining edits in buffer before close * @return true when underlying sink has something to flush */ - protected boolean flush() throws IOException { + public boolean flush() throws IOException { return false; } } @@ -1017,13 +1063,14 @@ public class HLogSplitter { */ class LogRecoveredEditsOutputSink extends OutputSink { - public LogRecoveredEditsOutputSink(int numWriters) { + public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, + int numWriters) { // More threads could potentially write faster at the expense // of causing more disk seeks as the logs are split. // 3. After a certain setting (probably around 3) the // process will be bound on the reader in the current // implementation anyway. - super(numWriters); + super(controller, entryBuffers, numWriters); } /** @@ -1031,7 +1078,7 @@ public class HLogSplitter { * @throws IOException */ @Override - List<Path> finishWritingAndClose() throws IOException { + public List<Path> finishWritingAndClose() throws IOException { boolean isSuccessful = false; List<Path> result = null; try { @@ -1247,7 +1294,7 @@ public class HLogSplitter { } @Override - void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException { List<Entry> entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1287,7 +1334,7 @@ public class HLogSplitter { * @return a map from encoded region ID to the number of edits written out for that region. */ @Override - Map<byte[], Long> getOutputCounts() { + public Map<byte[], Long> getOutputCounts() { TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); synchronized (writers) { for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { @@ -1298,7 +1345,7 @@ public class HLogSplitter { } @Override - int getNumberOfRecoveredRegions() { + public int getNumberOfRecoveredRegions() { return writers.size(); } } @@ -1306,7 +1353,7 @@ public class HLogSplitter { /** * Class wraps the actual writer which writes data out and related statistics */ - private abstract static class SinkWriter { + public abstract static class SinkWriter { /* Count of edits written to this path */ long editsWritten = 0; /* Number of nanos spent writing to this log */ @@ -1367,16 +1414,18 @@ public class HLogSplitter { private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; private boolean hasEditsInDisablingOrDisabledTables = false; - public LogReplayOutputSink(int numWriters) { - super(numWriters); + public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, + int numWriters) { + super(controller, entryBuffers, numWriters); this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT); - this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); + this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, + entryBuffers, numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); } @Override - void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException { List<Entry> entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1692,7 +1741,7 @@ public class HLogSplitter { } @Override - protected boolean flush() throws IOException { + public boolean flush() throws IOException { String curLoc = null; int curSize = 0; List<Pair<HRegionLocation, HLog.Entry>> curQueue = null; @@ -1712,7 +1761,7 @@ public class HLogSplitter { if (curSize > 0) { this.processWorkItems(curLoc, curQueue); - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); return true; } return false; @@ -1723,7 +1772,7 @@ public class HLogSplitter { } @Override - List<Path> finishWritingAndClose() throws IOException { + public List<Path> finishWritingAndClose() throws IOException { try { if (!finishWriting()) { return null; @@ -1798,7 +1847,7 @@ public class HLogSplitter { } @Override - Map<byte[], Long> getOutputCounts() { + public Map<byte[], Long> getOutputCounts() { TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); synchronized (writers) { for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) { @@ -1809,7 +1858,7 @@ public class HLogSplitter { } @Override - int getNumberOfRecoveredRegions() { + public int getNumberOfRecoveredRegions() { return this.recoveredRegions.size(); } @@ -1945,7 +1994,8 @@ public class HLogSplitter { * @throws IOException */ public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException { + Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability) + throws IOException { if (entry == null) { // return an empty array @@ -1998,6 +2048,9 @@ public class HLogSplitter { } ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell)); } + if (m != null) { + m.setDurability(durability); + } previousCell = cell; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java new file mode 100644 index 0000000..4a25c44 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RetryingCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.SinkWriter; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.OutputSink; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.protobuf.ServiceException; + +/** + * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the + * WAL, and sends the edits to replicas of regions. + */ +@InterfaceAudience.Private +public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class); + + private Configuration conf; + private ClusterConnection connection; + + // Reuse HLogSplitter constructs as a WAL pipe + private PipelineController controller; + private RegionReplicaOutputSink outputSink; + private EntryBuffers entryBuffers; + + // Number of writer threads + private int numWriterThreads; + + private int operationTimeout; + + private ExecutorService pool; + + @Override + public void init(Context context) throws IOException { + super.init(context); + + this.conf = HBaseConfiguration.create(context.getConfiguration()); + + String codecClassName = conf + .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); + + this.numWriterThreads = this.conf.getInt( + "hbase.region.replica.replication.writer.threads", 3); + controller = new PipelineController(); + entryBuffers = new EntryBuffers(controller, + this.conf.getInt("hbase.region.replica.replication.buffersize", + 128*1024*1024)); + + // use the regular RPC timeout for replica replication RPC's + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + + @Override + protected void doStart() { + try { + connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration()); + this.pool = getDefaultThreadPool(conf); + outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool, + numWriterThreads, operationTimeout); + outputSink.startWriterThreads(); + super.doStart(); + } catch (IOException ex) { + LOG.warn("Received exception while creating connection :" + ex); + notifyFailed(ex); + } + } + + @Override + protected void doStop() { + if (outputSink != null) { + try { + outputSink.finishWritingAndClose(); + } catch (IOException ex) { + LOG.warn("Got exception while trying to close OutputSink"); + LOG.warn(ex); + } + } + if (this.pool != null) { + this.pool.shutdownNow(); + try { + // wait for 10 sec + boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); + if (!shutdown) { + LOG.warn("Failed to shutdown the thread pool after 10 seconds"); + } + } catch (InterruptedException e) { + LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); + } + } + if (connection != null) { + try { + connection.close(); + } catch (IOException ex) { + LOG.warn("Got exception closing connection :" + ex); + } + } + super.doStop(); + } + + /** + * Returns a Thread pool for the RPC's to region replicas. Similar to + * Connection's thread pool. + */ + private ExecutorService getDefaultThreadPool(Configuration conf) { + int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); + int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); + LinkedBlockingQueue<Runnable> workQueue = + new LinkedBlockingQueue<Runnable>(maxThreads * + conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + Threads.newDaemonThreadFactory(this.getClass().toString() + "-rpc-shared-")); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + /* A note on batching in RegionReplicaReplicationEndpoint (RRRE): + * + * RRRE relies on batching from two different mechanisms. The first is the batching from + * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single + * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most + * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing). + * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits + * to the HLogSplitter.EntryBuffers which is a blocking buffer space of up to + * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits + * based on regions. + * + * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which + * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). + * The SinkWriter in this case will send the wal edits to all secondary region replicas in + * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is + * being written to the sink, another buffer for the same region will not be made available to + * writers ensuring regions edits are not replayed out of order. + * + * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so + * that the replication can assume all edits are persisted. We may be able to do a better + * pipelining between the replication thread and output sinks later if it becomes a bottleneck. + */ + + while (this.isRunning()) { + try { + for (Entry entry: replicateContext.getEntries()) { + entryBuffers.appendEntry(entry); + } + outputSink.flush(); // make sure everything is flushed + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (IOException e) { + LOG.warn("Received IOException while trying to replicate" + + StringUtils.stringifyException(e)); + } + } + + return false; + } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } + + @Override + protected WALEntryFilter getScopeWALEntryFilter() { + // we do not care about scope. We replicate everything. + return null; + } + + static class RegionReplicaOutputSink extends OutputSink { + private RegionReplicaSinkWriter sinkWriter; + + public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers, + ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { + super(controller, entryBuffers, numWriters); + this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + } + + @Override + public void append(RegionEntryBuffer buffer) throws IOException { + List<Entry> entries = buffer.getEntryBuffer(); + + if (entries.isEmpty() || entries.get(0).getEdit().getKeyValues().isEmpty()) { + return; + } + + sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), + entries.get(0).getEdit().getKeyValues().get(0).getRow(), entries); + } + + @Override + public boolean flush() throws IOException { + // nothing much to do for now. Wait for the Writer threads to finish up + // append()'ing the data. + entryBuffers.waitUntilDrained(); + return super.flush(); + } + + @Override + public List<Path> finishWritingAndClose() throws IOException { + finishWriting(); + return null; + } + + @Override + public Map<byte[], Long> getOutputCounts() { + return null; // only used in tests + } + + @Override + public int getNumberOfRecoveredRegions() { + return 0; + } + + AtomicLong getSkippedEditsCounter() { + return skippedEdits; + } + } + + static class RegionReplicaSinkWriter extends SinkWriter { + RegionReplicaOutputSink sink; + ClusterConnection connection; + RpcControllerFactory rpcControllerFactory; + RpcRetryingCallerFactory rpcRetryingCallerFactory; + int operationTimeout; + ExecutorService pool; + Cache<TableName, Boolean> disabledAndDroppedTables; + + public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, + ExecutorService pool, int operationTimeout) { + this.sink = sink; + this.connection = connection; + this.operationTimeout = operationTimeout; + this.rpcRetryingCallerFactory + = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); + this.pool = pool; + + int nonExistentTableCacheExpiryMs = connection.getConfiguration() + .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); + // A cache for non existing tables that have a default expiry of 5 sec. This means that if the + // table is created again with the same name, we might miss to replicate for that amount of + // time. But this cache prevents overloading meta requests for every edit from a deleted file. + disabledAndDroppedTables = CacheBuilder.newBuilder() + .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS) + .initialCapacity(10) + .maximumSize(1000) + .build(); + } + + public void append(TableName tableName, byte[] encodedRegionName, byte[] row, + List<Entry> entries) throws IOException { + + if (disabledAndDroppedTables.getIfPresent(tableName) != null) { + sink.getSkippedEditsCounter().incrementAndGet(); + return; + } + + // get the replicas of the primary region + RegionLocations locations = null; + try { + locations = getRegionLocations(connection, tableName, row, true, 0); + + if (locations == null) { + throw new HBaseIOException("Cannot locate locations for " + + tableName + ", row:" + Bytes.toStringBinary(row)); + } + } catch (TableNotFoundException e) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored + // skip this entry + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; + } + + if (locations.size() == 1) { + return; + } + + ArrayList<Future<ReplicateWALEntryResponse>> tasks + = new ArrayList<Future<ReplicateWALEntryResponse>>(2); + + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming form the primary region, filter it out. + HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); + if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), + encodedRegionName)) { + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; + } + + + // All passed entries should belong to one region because it is coming from the EntryBuffers + // split per region. But the regions might split and merge (unlike log recovery case). + for (int replicaId = 0; replicaId < locations.size(); replicaId++) { + HRegionLocation location = locations.getRegionLocation(replicaId); + if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { + HRegionInfo regionInfo = location == null + ? RegionReplicaUtil.getRegionInfoForReplica( + locations.getDefaultRegionLocation().getRegionInfo(), replicaId) + : location.getRegionInfo(); + RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, + rpcControllerFactory, tableName, location, regionInfo, row, entries, + sink.getSkippedEditsCounter()); + Future<ReplicateWALEntryResponse> task = pool.submit( + new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory, + callable, operationTimeout)); + tasks.add(task); + } + } + + boolean tasksCancelled = false; + for (Future<ReplicateWALEntryResponse> task : tasks) { + try { + task.get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + // The table can be disabled or dropped at this time. For disabled tables, we have no + // cheap mechanism to detect this case because meta does not contain this information. + // HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC. + // So instead we start the replay RPC with retries and + // check whether the table is dropped or disabled which might cause + // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE. + if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + if (!tasksCancelled) { + sink.getSkippedEditsCounter().addAndGet(entries.size()); + tasksCancelled = true; // so that we do not add to skipped counter again + } + continue; + } + // otherwise rethrow + throw (IOException)cause; + } + // unexpected exception + throw new IOException(cause); + } + } + } + } + + static class RetryingRpcCallable<V> implements Callable<V> { + RpcRetryingCallerFactory factory; + RetryingCallable<V> callable; + int timeout; + public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, + int timeout) { + this.factory = factory; + this.callable = callable; + this.timeout = timeout; + } + @Override + public V call() throws Exception { + return factory.<V>newCaller().callWithRetries(callable, timeout); + } + } + + /** + * Calls replay on the passed edits for the given set of entries belonging to the region. It skips + * the entry if the region boundaries have changed or the region is gone. + */ + static class RegionReplicaReplayCallable + extends RegionAdminServiceCallable<ReplicateWALEntryResponse> { + // replicaId of the region replica that we want to replicate to + private final int replicaId; + + private final List<HLog.Entry> entries; + private final byte[] initialEncodedRegionName; + private final AtomicLong skippedEntries; + private final RpcControllerFactory rpcControllerFactory; + private boolean skip; + + public RegionReplicaReplayCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, TableName tableName, + HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<HLog.Entry> entries, + AtomicLong skippedEntries) { + super(connection, location, tableName, row); + this.replicaId = regionInfo.getReplicaId(); + this.entries = entries; + this.rpcControllerFactory = rpcControllerFactory; + this.skippedEntries = skippedEntries; + this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); + } + + @Override + public HRegionLocation getLocation(boolean useCache) throws IOException { + RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); + if (rl == null) { + throw new HBaseIOException(getExceptionMessage()); + } + location = rl.getRegionLocation(replicaId); + if (location == null) { + throw new HBaseIOException(getExceptionMessage()); + } + + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming form the primary region, filter it out because we do not need it. + // Regions can change because of (1) region split (2) region merge (3) table recreated + if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), + initialEncodedRegionName)) { + skip = true; + return null; + } + + return location; + } + + @Override + public ReplicateWALEntryResponse call(int timeout) throws IOException { + return replayToServer(this.entries, timeout); + } + + private ReplicateWALEntryResponse replayToServer(List<HLog.Entry> entries, int timeout) + throws IOException { + if (entries.isEmpty() || skip) { + skippedEntries.incrementAndGet(); + return ReplicateWALEntryResponse.newBuilder().build(); + } + + HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; + entriesArray = entries.toArray(entriesArray); + + // set the region name for the target region replica + Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = + ReplicationProtbufUtil.buildReplicateWALEntryRequest( + entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + controller.setCallTimeout(timeout); + controller.setPriority(tableName); + return stub.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + @Override + protected String getExceptionMessage() { + return super.getExceptionMessage() + " table=" + tableName + + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row); + } + } + + private static RegionLocations getRegionLocations( + ClusterConnection connection, TableName tableName, byte[] row, + boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { + RegionLocations rl; + try { + rl = connection.locateRegion(tableName, row, useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; + } catch (IOException e) { + throw new RetriesExhaustedException("Can't get the location", e); + } + if (rl == null) { + throw new RetriesExhaustedException("Can't get the locations"); + } + + return rl; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 87cbcc6..22c80be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -706,7 +706,8 @@ public class ReplicationSource extends Thread } break; } catch (Exception ex) { - LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex); + LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + + org.apache.hadoop.util.StringUtils.stringifyException(ex)); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 74a74a1..199f45e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -25,10 +25,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -36,6 +41,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; public class ServerRegionReplicaUtil extends RegionReplicaUtil { /** + * Whether asynchronous WAL replication to the secondary region replicas is enabled or not. + * If this is enabled, a replication peer named "region_replica_replication" will be created + * which will tail the logs and replicate the mutatations to region replicas for tables that + * have region replication > 1. If this is enabled once, disabling this replication also + * requires disabling the replication peer using shell or ReplicationAdmin java class. + * Replication to secondary region replicas works over standard inter-cluster replication.· + * So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"· + * to true for this feature to work. + */ + public static final String REGION_REPLICA_REPLICATION_CONF_KEY + = "hbase.region.replica.replication.enabled"; + private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false; + private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + + /** * Returns the regionInfo object to use for interacting with the file system. * @return An HRegionInfo object to interact with the filesystem */ @@ -96,4 +116,35 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { return new StoreFileInfo(conf, fs, status, link); } + /** + * Create replication peer for replicating to region replicas if needed. + * @param conf configuration to use + * @throws IOException + */ + public static void setupRegionReplicaReplication(Configuration conf) throws IOException { + if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) { + return; + } + ReplicationAdmin repAdmin = new ReplicationAdmin(conf); + try { + if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf)); + peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); + repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); + } + } catch (ReplicationException ex) { + throw new IOException(ex); + } finally { + repAdmin.close(); + } + } + + /** + * Return the peer id used for replicating to secondary region replicas + */ + public static String getReplicationPeerId() { + return REGION_REPLICA_REPLICATION_PEER; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 7aeb778..bedd480 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1532,6 +1534,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { getHBaseAdmin().deleteTable(tableName); } + /** + * Drop an existing table + * @param tableName existing table + */ + public void deleteTableIfAny(TableName tableName) throws IOException { + try { + deleteTable(tableName); + } catch (TableNotFoundException e) { + // ignore + } + } + // ========================================================================== // Canned table and table descriptor creation // TODO replace HBaseTestCase @@ -1846,7 +1860,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return rowCount; } - public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + public void loadNumericRows(final HTableInterface t, final byte[] f, int startRow, int endRow) + throws IOException { for (int i = startRow; i < endRow; i++) { byte[] data = Bytes.toBytes(String.valueOf(i)); Put put = new Put(data); @@ -1855,7 +1870,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } - public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) + throws IOException { + for (int i = startRow; i < endRow; i++) { + String failMsg = "Failed verification of row :" + i; + byte[] data = Bytes.toBytes(String.valueOf(i)); + Result result = region.get(new Get(data)); + assertTrue(failMsg, result.containsColumn(f, null)); + assertEquals(failMsg, result.getColumnCells(f, null).size(), 1); + Cell cell = result.getColumnLatestCell(f, null); + assertTrue(failMsg, + Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength())); + } + } + + public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) + throws IOException { for (int i = startRow; i < endRow; i++) { byte[] data = Bytes.toBytes(String.valueOf(i)); Delete delete = new Delete(data); http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index d45cda3..dd90136 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*; import java.io.IOException; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TestMetaTableAccessor; import org.apache.hadoop.hbase.client.Consistency; @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -102,53 +101,9 @@ public class TestRegionReplicas { return HTU.getMiniHBaseCluster().getRegionServer(0); } - private void openRegion(HRegionInfo hri) throws Exception { - // first version is '0' - AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null, null); - AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); - Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); - Assert.assertTrue(responseOpen.getOpeningState(0). - equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED)); - checkRegionIsOpened(hri.getEncodedName()); - } - - private void closeRegion(HRegionInfo hri) throws Exception { - AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(), - hri.getEncodedName()); - AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr); - Assert.assertTrue(responseClose.getClosed()); - - checkRegionIsClosed(hri.getEncodedName()); - } - - private void checkRegionIsOpened(String encodedRegionName) throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { - Thread.sleep(1); - } - - Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); - } - - - private void checkRegionIsClosed(String encodedRegionName) throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { - Thread.sleep(1); - } - - try { - Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); - } catch (NotServingRegionException expected) { - // That's how it work: if the region is closed we have an exception. - } - - // We don't delete the znode here, because there is not always a znode. - } - @Test(timeout = 60000) public void testOpenRegionReplica() throws Exception { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); try { //load some data to primary HTU.loadNumericRows(table, f, 0, 1000); @@ -157,14 +112,14 @@ public class TestRegionReplicas { Assert.assertEquals(1000, HTU.countRows(table)); } finally { HTU.deleteNumericRows(table, f, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } /** Tests that the meta location is saved for secondary regions */ @Test(timeout = 60000) public void testRegionReplicaUpdatesMetaLocation() throws Exception { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); HTable meta = null; try { meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME); @@ -172,7 +127,7 @@ public class TestRegionReplicas { , getRS().getServerName(), -1, 1, false); } finally { if (meta != null ) meta.close(); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -186,7 +141,7 @@ public class TestRegionReplicas { // flush so that region replica can read getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); // first try directly against region HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); @@ -195,7 +150,7 @@ public class TestRegionReplicas { assertGetRpc(hriSecondary, 42, true); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -209,7 +164,7 @@ public class TestRegionReplicas { // flush so that region replica can read getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); // try directly Get against region replica byte[] row = Bytes.toBytes(String.valueOf(42)); @@ -220,7 +175,7 @@ public class TestRegionReplicas { Assert.assertArrayEquals(row, result.getValue(f, null)); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -236,7 +191,8 @@ public class TestRegionReplicas { } // build a mock rpc - private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException { + private void assertGetRpc(HRegionInfo info, int value, boolean expect) + throws IOException, ServiceException { byte[] row = Bytes.toBytes(String.valueOf(value)); Get get = new Get(row); ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); @@ -259,13 +215,14 @@ public class TestRegionReplicas { // enable store file refreshing final int refreshPeriod = 2000; // 2 sec HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); - HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); + HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + refreshPeriod); // restart the region server so that it starts the refresher chore restartRegionServer(); try { LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); //load some data to primary LOG.info("Loading data to primary region"); @@ -321,7 +278,7 @@ public class TestRegionReplicas { } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -338,7 +295,7 @@ public class TestRegionReplicas { final int startKey = 0, endKey = 1000; try { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); //load some data to primary so that reader won't fail HTU.loadNumericRows(table, f, startKey, endKey); @@ -402,13 +359,13 @@ public class TestRegionReplicas { // whether to do a close and open if (random.nextInt(10) == 0) { try { - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } catch (Exception ex) { LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); exceptions[2].compareAndSet(null, ex); } try { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); } catch (Exception ex) { LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); exceptions[2].compareAndSet(null, ex); @@ -443,7 +400,7 @@ public class TestRegionReplicas { } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index 3164f1d..ed5dfbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -147,49 +147,55 @@ public class TestRegionServerNoMaster { } - /** - * Reopen the region. Reused in multiple tests as we always leave the region open after a test. - */ - private void reopenRegion() throws Exception { + public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri) + throws Exception { AdminProtos.OpenRegionRequest orr = - RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null, null); - AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr); + RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null, null); + AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr); + Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); Assert.assertTrue(responseOpen.getOpeningState(0). equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED)); - checkRegionIsOpened(); + checkRegionIsOpened(HTU, rs, hri); } - private void checkRegionIsOpened() throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + public static void checkRegionIsOpened(HBaseTestingUtility HTU, HRegionServer rs, + HRegionInfo hri) throws Exception { + while (!rs.getRegionsInTransitionInRS().isEmpty()) { Thread.sleep(1); } - Assert.assertTrue(getRS().getRegion(regionName).isAvailable()); + Assert.assertTrue(rs.getRegion(hri.getRegionName()).isAvailable()); } + public static void closeRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri) + throws Exception { + AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest( + rs.getServerName(), hri.getEncodedName()); + AdminProtos.CloseRegionResponse responseClose = rs.rpcServices.closeRegion(null, crr); + Assert.assertTrue(responseClose.getClosed()); + checkRegionIsClosed(HTU, rs, hri); + } - private void checkRegionIsClosed() throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + public static void checkRegionIsClosed(HBaseTestingUtility HTU, HRegionServer rs, + HRegionInfo hri) throws Exception { + while (!rs.getRegionsInTransitionInRS().isEmpty()) { Thread.sleep(1); } try { - Assert.assertFalse(getRS().getRegion(regionName).isAvailable()); + Assert.assertFalse(rs.getRegion(hri.getRegionName()).isAvailable()); } catch (NotServingRegionException expected) { // That's how it work: if the region is closed we have an exception. } } - /** * Close the region without using ZK */ - private void closeNoZK() throws Exception { + private void closeRegionNoZK() throws Exception { // no transition in ZK AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(), regionName); @@ -197,14 +203,14 @@ public class TestRegionServerNoMaster { Assert.assertTrue(responseClose.getClosed()); // now waiting & checking. After a while, the transition should be done and the region closed - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); } @Test(timeout = 60000) public void testCloseByRegionServer() throws Exception { - closeNoZK(); - reopenRegion(); + closeRegionNoZK(); + openRegion(HTU, getRS(), hri); } /** @@ -221,8 +227,8 @@ public class TestRegionServerNoMaster { public void testMultipleOpen() throws Exception { // We close - closeNoZK(); - checkRegionIsClosed(); + closeRegionNoZK(); + checkRegionIsClosed(HTU, getRS(), hri); // We're sending multiple requests in a row. The region server must handle this nicely. for (int i = 0; i < 10; i++) { @@ -238,7 +244,7 @@ public class TestRegionServerNoMaster { ); } - checkRegionIsOpened(); + checkRegionIsOpened(HTU, getRS(), hri); } @Test @@ -279,9 +285,9 @@ public class TestRegionServerNoMaster { } } - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -290,8 +296,8 @@ public class TestRegionServerNoMaster { @Test(timeout = 60000) public void testCancelOpeningWithoutZK() throws Exception { // We close - closeNoZK(); - checkRegionIsClosed(); + closeRegionNoZK(); + checkRegionIsClosed(HTU, getRS(), hri); // Let do the initial steps, without having a handler getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE); @@ -315,9 +321,9 @@ public class TestRegionServerNoMaster { getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd)); // The open handler should have removed the region from RIT but kept the region closed - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -341,7 +347,7 @@ public class TestRegionServerNoMaster { } //actual close - closeNoZK(); + closeRegionNoZK(); try { AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( earlierServerName, hri, null, null); @@ -351,7 +357,7 @@ public class TestRegionServerNoMaster { Assert.assertTrue(se.getCause() instanceof IOException); Assert.assertTrue(se.getCause().getMessage().contains("This RPC was intended for a different server")); } finally { - reopenRegion(); + openRegion(HTU, getRS(), hri); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index cdf71f6..0718f48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -30,13 +30,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.mockito.Mockito.mock; - /** * Simple testing of a few HLog methods. */ @@ -45,7 +44,7 @@ public class TestHLogMethods { private static final byte[] TEST_REGION = Bytes.toBytes("test_region");; private static final TableName TEST_TABLE = TableName.valueOf("test_table"); - + private final HBaseTestingUtility util = new HBaseTestingUtility(); /** @@ -108,27 +107,25 @@ public class TestHLogMethods { reb.appendEntry(createTestLogEntry(1)); assertTrue(reb.heapSize() > 0); } - + @Test public void testEntrySink() throws Exception { Configuration conf = new Configuration(); - RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); - HLogSplitter splitter = new HLogSplitter( - conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode); - EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); + EntryBuffers sink = new EntryBuffers(new PipelineController(), 1*1024*1024); for (int i = 0; i < 1000; i++) { HLog.Entry entry = createTestLogEntry(i); sink.appendEntry(entry); } - + assertTrue(sink.totalBuffered > 0); long amountInChunk = sink.totalBuffered; // Get a chunk RegionEntryBuffer chunk = sink.getChunkToWrite(); assertEquals(chunk.heapSize(), amountInChunk); - + // Make sure it got marked that a thread is "working on this" assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION)); @@ -136,26 +133,26 @@ public class TestHLogMethods { for (int i = 0; i < 500; i++) { HLog.Entry entry = createTestLogEntry(i); sink.appendEntry(entry); - } + } // Asking for another chunk shouldn't work since the first one // is still writing assertNull(sink.getChunkToWrite()); - + // If we say we're done writing the first chunk, then we should be able // to get the second sink.doneWriting(chunk); - + RegionEntryBuffer chunk2 = sink.getChunkToWrite(); assertNotNull(chunk2); assertNotSame(chunk, chunk2); long amountInChunk2 = sink.totalBuffered; // The second chunk had fewer rows than the first assertTrue(amountInChunk2 < amountInChunk); - + sink.doneWriting(chunk2); assertEquals(0, sink.totalBuffered); } - + private HLog.Entry createTestLogEntry(int i) { long seq = i; long now = i * 1000;