http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 457d859..db98083 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -85,17 +85,16 @@ public interface WALActionsListener { ); /** - * * @param htd * @param logKey - * @param logEdit - * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} - * It only exists to get scope when replicating. Scope should be in the WALKey and not need - * us passing in a <code>htd</code>. + * @param logEdit TODO: Retire this in favor of + * {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get + * scope when replicating. Scope should be in the WALKey and not need us passing in a + * <code>htd</code>. + * @throws IOException If failed to parse the WALEdit */ - void visitLogEntryBeforeWrite( - HTableDescriptor htd, WALKey logKey, WALEdit logEdit - ); + void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) + throws IOException; /** * For notification post append to the writer. Used by metrics system at least. @@ -136,7 +135,9 @@ public interface WALActionsListener { public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {} @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {} + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) + throws IOException { + } @Override public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 3501f3e..f97ec15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -18,13 +18,21 @@ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.NavigableMap; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -32,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ @InterfaceAudience.Private public class ScopeWALEntryFilter implements WALEntryFilter { + private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class); @Override public Entry filter(Entry entry) { @@ -41,13 +50,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter { } ArrayList<Cell> cells = entry.getEdit().getCells(); int size = cells.size(); + byte[] fam; for (int i = size - 1; i >= 0; i--) { Cell cell = cells.get(i); - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - byte[] fam = CellUtil.cloneFamily(cell); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - cells.remove(i); + // If a bulk load entry has a scope then that means user has enabled replication for bulk load + // hfiles. + // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so + // cannot refactor into one now, can revisit and see if any way to unify them. + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell); + if (filteredBulkLoadEntryCell != null) { + cells.set(i, filteredBulkLoadEntryCell); + } else { + cells.remove(i); + } + } else { + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + fam = CellUtil.cloneFamily(cell); + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { + cells.remove(i); + } } } if (cells.size() < size / 2) { @@ -56,4 +79,41 @@ public class ScopeWALEntryFilter implements WALEntryFilter { return entry; } + private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) { + byte[] fam; + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(cell); + } catch (IOException e) { + LOG.warn("Failed to get bulk load events information from the WAL file.", e); + return cell; + } + List<StoreDescriptor> storesList = bld.getStoresList(); + // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList + List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList); + Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); + boolean anyStoreRemoved = false; + while (copiedStoresListIterator.hasNext()) { + StoreDescriptor sd = copiedStoresListIterator.next(); + fam = sd.getFamilyName().toByteArray(); + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } + } + + if (!anyStoreRemoved) { + return cell; + } else if (copiedStoresList.isEmpty()) { + return null; + } + BulkLoadDescriptor.Builder newDesc = + BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) + .setEncodedRegionName(bld.getEncodedRegionName()) + .setBulkloadSeqNum(bld.getBulkloadSeqNum()); + newDesc.addAllStores(copiedStoresList); + BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); + return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, + cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index 642ee8a..f10849b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -18,14 +18,20 @@ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -52,19 +58,36 @@ public class TableCfWALEntryFilter implements WALEntryFilter { } int size = cells.size(); + // If null means user has explicitly not configured any table CFs so all the tables data are + // applicable for replication + if (tableCFs == null) { + return entry; + } // return null(prevent replicating) if logKey's table isn't in this peer's - // replicable table list (empty tableCFs means all table are replicable) - if (tableCFs != null && !tableCFs.containsKey(tabName)) { + // replicable table list + if (!tableCFs.containsKey(tabName)) { return null; } else { - List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName); + List<String> cfs = tableCFs.get(tabName); for (int i = size - 1; i >= 0; i--) { Cell cell = cells.get(i); - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if ((cfs != null) && !cfs.contains(Bytes.toString( - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { - cells.remove(i); + // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so + // cannot refactor into one now, can revisit and see if any way to unify them. + // Filter bulk load entries separately + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell); + if (filteredBulkLoadEntryCell != null) { + cells.set(i, filteredBulkLoadEntryCell); + } else { + cells.remove(i); + } + } else { + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(), + cell.getFamilyOffset(), cell.getFamilyLength()))) { + cells.remove(i); + } } } } @@ -74,4 +97,41 @@ public class TableCfWALEntryFilter implements WALEntryFilter { return entry; } + private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) { + byte[] fam; + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(cell); + } catch (IOException e) { + LOG.warn("Failed to get bulk load events information from the WAL file.", e); + return cell; + } + List<StoreDescriptor> storesList = bld.getStoresList(); + // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList + List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList); + Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); + boolean anyStoreRemoved = false; + while (copiedStoresListIterator.hasNext()) { + StoreDescriptor sd = copiedStoresListIterator.next(); + fam = sd.getFamilyName().toByteArray(); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } + } + + if (!anyStoreRemoved) { + return cell; + } else if (copiedStoresList.isEmpty()) { + return null; + } + BulkLoadDescriptor.Builder newDesc = + BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) + .setEncodedRegionName(bld.getEncodedRegionName()) + .setBulkloadSeqNum(bld.getBulkloadSeqNum()); + newDesc.addAllStores(copiedStoresList); + BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); + return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, + cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java new file mode 100644 index 0000000..9bfea4b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before + * deleting it from hfile archive directory. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable { + private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class); + private ZooKeeperWatcher zkw; + private ReplicationQueuesClient rqc; + private boolean stopped = false; + private boolean aborted; + + @Override + public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { + // all members of this class are null if replication is disabled, + // so we cannot filter the files + if (this.getConf() == null) { + return files; + } + + final Set<String> hfileRefs; + try { + // The concurrently created new hfile entries in ZK may not be included in the return list, + // but they won't be deleted because they're not in the checking set. + hfileRefs = loadHFileRefsFromPeers(); + } catch (KeeperException e) { + LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); + return Collections.emptyList(); + } + return Iterables.filter(files, new Predicate<FileStatus>() { + @Override + public boolean apply(FileStatus file) { + String hfile = file.getPath().getName(); + boolean foundHFileRefInQueue = hfileRefs.contains(hfile); + if (LOG.isDebugEnabled()) { + if (foundHFileRefInQueue) { + LOG.debug("Found hfile reference in ZK, keeping: " + hfile); + } else { + LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); + } + } + return !foundHFileRefInQueue; + } + }); + } + + /** + * Load all hfile references in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all hfile references in the zookeeper at the start of this call. + * However, some newly created hfile references during the call may not be included. + */ + private Set<String> loadHFileRefsFromPeers() throws KeeperException { + Set<String> hfileRefs = Sets.newHashSet(); + List<String> listOfPeers; + for (int retry = 0;; retry++) { + int v0 = rqc.getHFileRefsNodeChangeVersion(); + hfileRefs.clear(); + listOfPeers = rqc.getAllPeersFromHFileRefsQueue(); + if (listOfPeers == null) { + LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); + return ImmutableSet.of(); + } + for (String id : listOfPeers) { + List<String> peerHFileRefs = rqc.getReplicableHFiles(id); + if (peerHFileRefs != null) { + hfileRefs.addAll(peerHFileRefs); + } + } + int v1 = rqc.getHFileRefsNodeChangeVersion(); + if (v0 == v1) { + return hfileRefs; + } + LOG.debug(String.format("Replication hfile references node cversion changed from " + + "%d to %d, retry = %d", v0, v1, retry)); + } + } + + @Override + public void setConf(Configuration config) { + // If either replication or replication of bulk load hfiles is disabled, keep all members null + if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, + HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean( + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) { + LOG.warn(HConstants.REPLICATION_ENABLE_KEY + + " is not enabled so allowing all hfile references to be deleted. Better to remove " + + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS + + " configuration."); + return; + } + // Make my own Configuration. Then I'll have my own connection to zk that + // I can close myself when time comes. + Configuration conf = new Configuration(config); + super.setConf(conf); + try { + initReplicationQueuesClient(conf); + } catch (IOException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } + } + + private void initReplicationQueuesClient(Configuration conf) + throws ZooKeeperConnectionException, IOException { + this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null); + this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this); + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + if (this.zkw != null) { + LOG.info("Stopping " + this.zkw); + this.zkw.close(); + } + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Aborting ReplicationHFileCleaner because " + why, e); + this.aborted = true; + stop(why); + } + + @Override + public boolean isAborted() { + return this.aborted; + } + + @Override + public boolean isFileDeletable(FileStatus fStat) { + Set<String> hfileRefsFromQueue; + // all members of this class are null if replication is disabled, + // so do not stop from deleting the file + if (getConf() == null) { + return true; + } + + try { + hfileRefsFromQueue = loadHFileRefsFromPeers(); + } catch (KeeperException e) { + LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " + + "file for " + fStat.getPath()); + return false; + } + return !hfileRefsFromQueue.contains(fStat.getPath().getName()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java new file mode 100644 index 0000000..8d5c6d4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This will load all the xml configuration files for the source cluster replication ID from + * user configured replication configuration directory. + */ +@InterfaceAudience.Private +public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider { + private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class); + // Map containing all the source clusters configurations against their replication cluster id + private Map<String, Configuration> sourceClustersConfs = new HashMap<>(); + private static final String XML = ".xml"; + + @Override + public Configuration getConf(Configuration sinkConf, String replicationClusterId) + throws IOException { + if (sourceClustersConfs.get(replicationClusterId) == null) { + synchronized (this.sourceClustersConfs) { + if (sourceClustersConfs.get(replicationClusterId) == null) { + LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId); + // Load only user provided client configurations. + Configuration sourceClusterConf = new Configuration(false); + + String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR); + if (replicationConfDir == null) { + LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured."); + URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml"); + if (resource != null) { + String path = resource.getPath(); + replicationConfDir = path.substring(0, path.lastIndexOf("/")); + } else { + replicationConfDir = System.getenv("HBASE_CONF_DIR"); + } + } + + LOG.info("Loading source cluster " + replicationClusterId + + " file system configurations from xml files under directory " + replicationConfDir); + File confDir = new File(replicationConfDir, replicationClusterId); + String[] listofConfFiles = FileUtil.list(confDir); + for (String confFile : listofConfFiles) { + if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) { + // Add all the user provided client conf files + sourceClusterConf.addResource(new Path(confDir.getPath(), confFile)); + } + } + this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf); + } + } + } + return this.sourceClustersConfs.get(replicationClusterId); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7c07ecc..d51d512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -37,24 +37,26 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; /** - * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} + * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} * implementation for replicating to another HBase cluster. * For the slave cluster it selects a random number of peers * using a replication ratio. For example, if replication ration = 0.1 @@ -84,8 +86,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; + private String replicationClusterId = ""; private ThreadPoolExecutor exec; private int maxThreads; + private Path baseNamespaceDir; + private Path hfileArchiveDir; + private boolean replicationBulkLoadDataEnabled; @Override public void init(Context context) throws IOException { @@ -108,7 +114,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); + new SynchronousQueue<Runnable>()); + + this.replicationBulkLoadDataEnabled = + conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + if (this.replicationBulkLoadDataEnabled) { + replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); + } + // Construct base namespace directory and hfile archive directory path + Path rootDir = FSUtils.getRootDir(conf); + Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); + baseNamespaceDir = new Path(rootDir, baseNSDir); + hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); } private void decorateConf() { @@ -317,8 +335,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); - ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new Entry[entries.size()])); + ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java new file mode 100644 index 0000000..17f6780 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.math.BigInteger; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local + * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of + * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster. + */ +@InterfaceAudience.Private +public class HFileReplicator { + /** Maximum number of threads to allow in pool to copy hfiles during replication */ + public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = + "hbase.replication.bulkload.copy.maxthreads"; + public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10; + /** Number of hfiles to copy per thread during replication */ + public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = + "hbase.replication.bulkload.copy.hfiles.perthread"; + public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10; + + private static final Log LOG = LogFactory.getLog(HFileReplicator.class); + private final String UNDERSCORE = "_"; + private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); + + private Configuration sourceClusterConf; + private String sourceBaseNamespaceDirPath; + private String sourceHFileArchiveDirPath; + private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap; + private FileSystem sinkFs; + private FsDelegationToken fsDelegationToken; + private UserProvider userProvider; + private Configuration conf; + private Connection connection; + private String hbaseStagingDir; + private ThreadPoolExecutor exec; + private int maxCopyThreads; + private int copiesPerThread; + + public HFileReplicator(Configuration sourceClusterConf, + String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, + Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, + Connection connection) throws IOException { + this.sourceClusterConf = sourceClusterConf; + this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; + this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; + this.bulkLoadHFileMap = tableQueueMap; + this.conf = conf; + this.connection = connection; + + userProvider = UserProvider.instantiate(conf); + fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir"); + this.maxCopyThreads = + this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, + REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("HFileReplicationCallable-%1$d"); + this.exec = + new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), builder.build()); + this.exec.allowCoreThreadTimeOut(true); + this.copiesPerThread = + conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, + REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); + + sinkFs = FileSystem.get(conf); + } + + public Void replicate() throws IOException { + // Copy all the hfiles to the local file system + Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir(); + + int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); + + for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) { + String tableNameString = tableStagingDir.getKey(); + Path stagingDir = tableStagingDir.getValue(); + + LoadIncrementalHFiles loadHFiles = null; + try { + loadHFiles = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" + + " data.", e); + throw new IOException(e); + } + Configuration newConf = HBaseConfiguration.create(conf); + newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); + loadHFiles.setConf(newConf); + + TableName tableName = TableName.valueOf(tableNameString); + Table table = this.connection.getTable(tableName); + + // Prepare collection of queue of hfiles to be loaded(replicated) + Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>(); + loadHFiles.prepareHFileQueue(stagingDir, table, queue, false); + + if (queue.isEmpty()) { + LOG.warn("Replication process did not find any files to replicate in directory " + + stagingDir.toUri()); + return null; + } + + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + + fsDelegationToken.acquireDelegationToken(sinkFs); + + // Set the staging directory which will be used by LoadIncrementalHFiles for loading the + // data + loadHFiles.setBulkToken(stagingDir.toString()); + + doBulkLoad(loadHFiles, table, queue, locator, maxRetries); + } finally { + cleanup(stagingDir.toString(), table); + } + } + return null; + } + + private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, + Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException { + int count = 0; + Pair<byte[][], byte[][]> startEndKeys; + while (!queue.isEmpty()) { + // need to reload split keys each iteration. + startEndKeys = locator.getStartEndKeys(); + if (count != 0) { + LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with " + + queue.size() + " files still remaining to replicate."); + } + + if (maxRetries != 0 && count >= maxRetries) { + throw new IOException("Retry attempted " + count + + " times without completing, bailing out."); + } + count++; + + // Try bulk load + loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys); + } + } + + private void cleanup(String stagingDir, Table table) { + // Release the file system delegation token + fsDelegationToken.releaseDelegationToken(); + // Delete the staging directory + if (stagingDir != null) { + try { + sinkFs.delete(new Path(stagingDir), true); + } catch (IOException e) { + LOG.warn("Failed to delete the staging directory " + stagingDir, e); + } + } + // Do not close the file system + + /* + * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn( + * "Failed to close the file system"); } } + */ + + // Close the table + if (table != null) { + try { + table.close(); + } catch (IOException e) { + LOG.warn("Failed to close the table.", e); + } + } + } + + private Map<String, Path> copyHFilesToStagingDir() throws IOException { + Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>(); + Pair<byte[], List<String>> familyHFilePathsPair; + List<String> hfilePaths; + byte[] family; + Path familyStagingDir; + int familyHFilePathsPairsListSize; + int totalNoOfHFiles; + List<Pair<byte[], List<String>>> familyHFilePathsPairsList; + FileSystem sourceFs = null; + + try { + Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath); + /* + * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster + * has same FS name service then it will return peer cluster FS. To avoid this we explicitly + * disable the loading of FS from cache, so that a new FS is created with source cluster + * configuration. + */ + String sourceScheme = sourceClusterPath.toUri().getScheme(); + String disableCacheName = + String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme }); + sourceClusterConf.setBoolean(disableCacheName, true); + + sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf); + + User user = userProvider.getCurrent(); + // For each table name in the map + for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap + .entrySet()) { + String tableName = tableEntry.getKey(); + + // Create staging directory for each table + Path stagingDir = + createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName)); + + familyHFilePathsPairsList = tableEntry.getValue(); + familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); + + // For each list of family hfile paths pair in the table + for (int i = 0; i < familyHFilePathsPairsListSize; i++) { + familyHFilePathsPair = familyHFilePathsPairsList.get(i); + + family = familyHFilePathsPair.getFirst(); + hfilePaths = familyHFilePathsPair.getSecond(); + + familyStagingDir = new Path(stagingDir, Bytes.toString(family)); + totalNoOfHFiles = hfilePaths.size(); + + // For each list of hfile paths for the family + List<Future<Void>> futures = new ArrayList<Future<Void>>(); + Callable<Void> c; + Future<Void> future; + int currentCopied = 0; + // Copy the hfiles parallely + while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { + c = + new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, + currentCopied + this.copiesPerThread)); + future = exec.submit(c); + futures.add(future); + currentCopied += this.copiesPerThread; + } + + int remaining = totalNoOfHFiles - currentCopied; + if (remaining > 0) { + c = + new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, + currentCopied + remaining)); + future = exec.submit(c); + futures.add(future); + } + + for (Future<Void> f : futures) { + try { + f.get(); + } catch (InterruptedException e) { + InterruptedIOException iioe = + new InterruptedIOException( + "Failed to copy HFiles to local file system. This will be retried again " + + "by the source cluster."); + iioe.initCause(e); + throw iioe; + } catch (ExecutionException e) { + throw new IOException("Failed to copy HFiles to local file system. This will " + + "be retried again by the source cluster.", e); + } + } + } + // Add the staging directory to this table. Staging directory contains all the hfiles + // belonging to this table + mapOfCopiedHFiles.put(tableName, stagingDir); + } + return mapOfCopiedHFiles; + } finally { + if (sourceFs != null) { + sourceFs.close(); + } + if(exec != null) { + exec.shutdown(); + } + } + } + + private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { + String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); + int RANDOM_WIDTH = 320; + int RANDOM_RADIX = 32; + String doubleUnderScore = UNDERSCORE + UNDERSCORE; + String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore + + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX)); + return createStagingDir(baseDir, user, randomDir); + } + + private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException { + Path p = new Path(baseDir, randomDir); + sinkFs.mkdirs(p, PERM_ALL_ACCESS); + sinkFs.setPermission(p, PERM_ALL_ACCESS); + return p; + } + + /** + * This class will copy the given hfiles from the given source file system to the given local file + * system staging directory. + */ + private class Copier implements Callable<Void> { + private FileSystem sourceFs; + private Path stagingDir; + private List<String> hfiles; + + public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles) + throws IOException { + this.sourceFs = sourceFs; + this.stagingDir = stagingDir; + this.hfiles = hfiles; + } + + @Override + public Void call() throws IOException { + Path sourceHFilePath; + Path localHFilePath; + int totalHFiles = hfiles.size(); + for (int i = 0; i < totalHFiles; i++) { + sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); + localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); + try { + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + // If any other exception other than FNFE then we will fail the replication requests and + // source will retry to replicate these data. + } catch (FileNotFoundException e) { + LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + + ". Trying to copy from hfile archive directory.", + e); + sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); + + try { + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + } catch (FileNotFoundException e1) { + // This will mean that the hfile does not exists any where in source cluster FS. So we + // cannot do anything here just log and return. + LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + + ". Hence ignoring this hfile from replication..", + e1); + return null; + } + } + sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index 37dc1dd..f308daf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -47,7 +47,7 @@ public class MetricsSink { if (lastTimestampForAge != timestamp) { lastTimestampForAge = timestamp; age = System.currentTimeMillis() - lastTimestampForAge; - } + } mss.setLastAppliedOpAge(age); return age; } @@ -72,6 +72,17 @@ public class MetricsSink { } /** + * Convience method to change metrics when a batch of operations are applied. + * + * @param batchSize total number of mutations that are applied/replicated + * @param hfileSize total number of hfiles that are applied/replicated + */ + public void applyBatch(long batchSize, long hfileSize) { + applyBatch(batchSize); + mss.incrAppliedHFiles(hfileSize); + } + + /** * Get the Age of Last Applied Op * @return ageOfLastAppliedOp */ http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index f9f7001..9687af7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -40,11 +40,13 @@ public class MetricsSource { // tracks last shipped timestamp for each wal group private Map<String, Long> lastTimeStamps = new HashMap<String, Long>(); private int lastQueueSize = 0; + private long lastHFileRefsQueueSize = 0; private String id; private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; + /** * Constructor used to register the metrics * @@ -143,6 +145,18 @@ public class MetricsSource { globalSourceSource.incrShippedKBs(sizeInKB); } + /** + * Convience method to apply changes to metrics do to shipping a batch of logs. + * + * @param batchSize the size of the batch that was shipped to sinks. + * @param hfiles total number of hfiles shipped to sinks. + */ + public void shipBatch(long batchSize, int sizeInKB, long hfiles) { + shipBatch(batchSize, sizeInKB); + singleSourceSource.incrHFilesShipped(hfiles); + globalSourceSource.incrHFilesShipped(hfiles); + } + /** increase the byte number read by source from log file */ public void incrLogReadInBytes(long readInBytes) { singleSourceSource.incrLogReadInBytes(readInBytes); @@ -153,8 +167,10 @@ public class MetricsSource { public void clear() { singleSourceSource.clear(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); + globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); lastTimeStamps.clear(); lastQueueSize = 0; + lastHFileRefsQueueSize = 0; } /** @@ -194,4 +210,19 @@ public class MetricsSource { public String getPeerID() { return id; } + + public void incrSizeOfHFileRefsQueue(long size) { + singleSourceSource.incrSizeOfHFileRefsQueue(size); + globalSourceSource.incrSizeOfHFileRefsQueue(size); + lastHFileRefsQueueSize = size; + } + + public void decrSizeOfHFileRefsQueue(int size) { + singleSourceSource.decrSizeOfHFileRefsQueue(size); + globalSourceSource.decrSizeOfHFileRefsQueue(size); + lastHFileRefsQueueSize -= size; + if (lastHFileRefsQueueSize < 0) { + lastHFileRefsQueueSize = 0; + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index b3db0f6..30153f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -649,8 +649,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = - ReplicationProtbufUtil.buildReplicateWALEntryRequest( - entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); + ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location + .getRegionInfo().getEncodedNameAsBytes(), null, null, null); try { PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); controller.setCallTimeout(timeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b396dfc..d2a0776 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,7 +45,10 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.wal.WALKey; @@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -71,6 +76,7 @@ public class Replication extends WALActionsListener.Base implements private static final Log LOG = LogFactory.getLog(Replication.class); private boolean replication; + private boolean replicationForBulkLoadData; private ReplicationSourceManager replicationManager; private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; @@ -84,7 +90,6 @@ public class Replication extends WALActionsListener.Base implements private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; - /** * Instantiate the replication management (if rep is enabled). * @param server Hosting server @@ -109,11 +114,20 @@ public class Replication extends WALActionsListener.Base implements this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); + this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); + if (this.replicationForBulkLoadData) { + if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null + || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { + throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID + + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + + " is set to true."); + } + } if (replication) { try { this.replicationQueues = @@ -158,6 +172,15 @@ public class Replication extends WALActionsListener.Base implements return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); } + /** + * @param c Configuration to look at + * @return True if replication for bulk load data is enabled. + */ + public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { + return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + } + /* * Returns an object to listen to new wal changes **/ @@ -187,14 +210,22 @@ public class Replication extends WALActionsListener.Base implements /** * Carry on the list of log entries down to the sink * @param entries list of entries to replicate - * @param cells The data -- the cells -- that <code>entries</code> describes (the entries - * do not contain the Cells we are replicating; they are passed here on the side in this - * CellScanner). + * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not + * contain the Cells we are replicating; they are passed here on the side in this + * CellScanner). + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace + * directory required for replicating hfiles + * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory * @throws IOException */ - public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException { + public void replicateLogEntries(List<WALEntry> entries, CellScanner cells, + String replicationClusterId, String sourceBaseNamespaceDirPath, + String sourceHFileArchiveDirPath) throws IOException { if (this.replication) { - this.replicationSink.replicateEntries(entries, cells); + this.replicationSink.replicateEntries(entries, cells, replicationClusterId, + sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); } } @@ -226,34 +257,44 @@ public class Replication extends WALActionsListener.Base implements } @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, - WALEdit logEdit) { - scopeWALEdits(htd, logKey, logEdit); + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) + throws IOException { + scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager()); } /** - * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys - * from compaction WAL edits and if the scope is local. + * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from + * compaction WAL edits and if the scope is local. * @param htd Descriptor used to find the scope to use * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes + * @param replicationManager Manager used to add bulk load events hfile references + * @throws IOException If failed to parse the WALEdit */ - public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, - WALEdit logEdit) { - NavigableMap<byte[], Integer> scopes = - new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit, + Configuration conf, ReplicationSourceManager replicationManager) throws IOException { + NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; + boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); for (Cell cell : logEdit.getCells()) { - family = CellUtil.cloneFamily(cell); - // This is expected and the KV should not be replicated - if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue; - // Unexpected, has a tendency to happen in unit tests - assert htd.getFamily(family) != null; + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell); + } else { + // Skip the flush/compaction/region events + continue; + } + } else { + family = CellUtil.cloneFamily(cell); + // Unexpected, has a tendency to happen in unit tests + assert htd.getFamily(family) != null; - int scope = htd.getFamily(family).getScope(); - if (scope != REPLICATION_SCOPE_LOCAL && - !scopes.containsKey(family)) { - scopes.put(family, scope); + if (!scopes.containsKey(family)) { + int scope = htd.getFamily(family).getScope(); + if (scope != REPLICATION_SCOPE_LOCAL) { + scopes.put(family, scope); + } + } } } if (!scopes.isEmpty()) { @@ -261,6 +302,40 @@ public class Replication extends WALActionsListener.Base implements } } + private static void scopeBulkLoadEdits(HTableDescriptor htd, + ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes, + TableName tableName, Cell cell) throws IOException { + byte[] family; + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + for (StoreDescriptor s : bld.getStoresList()) { + family = s.getFamilyName().toByteArray(); + if (!scopes.containsKey(family)) { + int scope = htd.getFamily(family).getScope(); + if (scope != REPLICATION_SCOPE_LOCAL) { + scopes.put(family, scope); + addHFileRefsToQueue(replicationManager, tableName, family, s); + } + } else { + addHFileRefsToQueue(replicationManager, tableName, family, s); + } + } + } catch (IOException e) { + LOG.error("Failed to get bulk load events information from the wal file.", e); + throw e; + } + } + + private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager, + TableName tableName, byte[] family, StoreDescriptor s) throws IOException { + try { + replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); + } catch (ReplicationException e) { + LOG.error("Failed to create hfile references in ZK.", e); + throw new IOException(e); + } + } + @Override public void preLogRoll(Path oldPath, Path newPath) throws IOException { getReplicationManager().preLogRoll(newPath); @@ -272,8 +347,7 @@ public class Replication extends WALActionsListener.Base implements } /** - * This method modifies the master's configuration in order to inject - * replication-related features + * This method modifies the master's configuration in order to inject replication-related features * @param conf */ public static void decorateMasterConfiguration(Configuration conf) { @@ -285,6 +359,13 @@ public class Replication extends WALActionsListener.Base implements if (!plugins.contains(cleanerClass)) { conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); } + if (isReplicationForBulkLoadDataEnabled(conf)) { + plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); + cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); + if (!plugins.contains(cleanerClass)) { + conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); + } + } } /* http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index f10f5e3..9e7b3af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -33,15 +33,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * <p> @@ -78,6 +84,9 @@ public class ReplicationSink { private final MetricsSink metrics; private final AtomicLong totalReplicatedEdits = new AtomicLong(); private final Object sharedHtableConLock = new Object(); + // Number of hfiles that we successfully replicated + private long hfilesReplicated = 0; + private SourceFSConfigurationProvider provider; /** * Create a sink for replication @@ -91,6 +100,18 @@ public class ReplicationSink { this.conf = HBaseConfiguration.create(conf); decorateConf(); this.metrics = new MetricsSink(); + + String className = + conf.get("hbase.replication.source.fs.conf.provider", + DefaultSourceFSConfigurationProvider.class.getCanonicalName()); + try { + @SuppressWarnings("rawtypes") + Class c = Class.forName(className); + this.provider = (SourceFSConfigurationProvider) c.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Configured source fs configuration provider class " + + className + " throws error.", e); + } } /** @@ -113,9 +134,16 @@ public class ReplicationSink { * operates against raw protobuf type saving on a conversion from pb to pojo. * @param entries * @param cells - * @throws IOException + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace + * directory + * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory + * @throws IOException If failed to replicate the data */ - public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException { + public void replicateEntries(List<WALEntry> entries, final CellScanner cells, + String replicationClusterId, String sourceBaseNamespaceDirPath, + String sourceHFileArchiveDirPath) throws IOException { if (entries.isEmpty()) return; if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner"); // Very simple optimization where we batch sequences of rows going @@ -126,6 +154,10 @@ public class ReplicationSink { // invocation of this method per table and cluster id. Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<TableName, Map<List<UUID>, List<Row>>>(); + + // Map of table name Vs list of pair of family and list of hfile paths from its namespace + Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null; + for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -138,33 +170,60 @@ public class ReplicationSink { throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } Cell cell = cells.current(); - if (isNewRowOrType(previousCell, cell)) { - // Create new mutation - m = CellUtil.isDelete(cell)? - new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): - new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - List<UUID> clusterIds = new ArrayList<UUID>(); - for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){ - clusterIds.add(toUUID(clusterId)); + // Handle bulk load hfiles replication + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>(); } - m.setClusterIds(clusterIds); - addToHashMultiMap(rowMap, table, clusterIds, m); - } - if (CellUtil.isDelete(cell)) { - ((Delete)m).addDeleteMarker(cell); + buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); } else { - ((Put)m).add(cell); + // Handle wal replication + if (isNewRowOrType(previousCell, cell)) { + // Create new mutation + m = + CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength()); + List<UUID> clusterIds = new ArrayList<UUID>(); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + m.setClusterIds(clusterIds); + addToHashMultiMap(rowMap, table, clusterIds, m); + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).addDeleteMarker(cell); + } else { + ((Put) m).add(cell); + } + previousCell = cell; } - previousCell = cell; } totalReplicated++; } - for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) { - batch(entry.getKey(), entry.getValue().values()); + + // TODO Replicating mutations and bulk loaded data can be made parallel + if (!rowMap.isEmpty()) { + LOG.debug("Started replicating mutations."); + for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { + batch(entry.getKey(), entry.getValue().values()); + } + LOG.debug("Finished replicating mutations."); + } + + if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { + LOG.debug("Started replicating bulk loaded data."); + HFileReplicator hFileReplicator = + new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), + sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, + getConnection()); + hFileReplicator.replicate(); + LOG.debug("Finished replicating bulk loaded data."); } + int size = entries.size(); this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); - this.metrics.applyBatch(size); + this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); @@ -172,6 +231,76 @@ public class ReplicationSink { } } + private void buildBulkLoadHFileMap( + final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, + Cell cell) throws IOException { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + List<StoreDescriptor> storesList = bld.getStoresList(); + int storesSize = storesList.size(); + for (int j = 0; j < storesSize; j++) { + StoreDescriptor storeDescriptor = storesList.get(j); + List<String> storeFileList = storeDescriptor.getStoreFileList(); + int storeFilesSize = storeFileList.size(); + hfilesReplicated += storeFilesSize; + for (int k = 0; k < storeFilesSize; k++) { + byte[] family = storeDescriptor.getFamilyName().toByteArray(); + + // Build hfile relative path from its namespace + String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); + + String tableName = table.getNameWithNamespaceInclAsString(); + if (bulkLoadHFileMap.containsKey(tableName)) { + List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); + boolean foundFamily = false; + for (int i = 0; i < familyHFilePathsList.size(); i++) { + Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i); + if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { + // Found family already present, just add the path to the existing list + familyHFilePathsPair.getSecond().add(pathToHfileFromNS); + foundFamily = true; + break; + } + } + if (!foundFamily) { + // Family not found, add this family and its hfile paths pair to the list + addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); + } + } else { + // Add this table entry into the map + addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); + } + } + } + } + + private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, + List<Pair<byte[], List<String>>> familyHFilePathsList) { + List<String> hfilePaths = new ArrayList<String>(); + hfilePaths.add(pathToHfileFromNS); + familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths)); + } + + private void addNewTableEntryInMap( + final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, + String pathToHfileFromNS, String tableName) { + List<String> hfilePaths = new ArrayList<String>(); + hfilePaths.add(pathToHfileFromNS); + Pair<byte[], List<String>> newFamilyHFilePathsPair = + new Pair<byte[], List<String>>(family, hfilePaths); + List<Pair<byte[], List<String>>> newFamilyHFilePathsList = + new ArrayList<Pair<byte[], List<String>>>(); + newFamilyHFilePathsList.add(newFamilyHFilePathsPair); + bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); + } + + private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, + byte[] family) { + return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) + .append(table.getQualifierAsString()).append(Path.SEPARATOR) + .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) + .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); + } + /** * @param previousCell * @param cell @@ -241,22 +370,13 @@ public class ReplicationSink { } Table table = null; try { - // See https://en.wikipedia.org/wiki/Double-checked_locking - Connection connection = this.sharedHtableCon; - if (connection == null) { - synchronized (sharedHtableConLock) { - connection = this.sharedHtableCon; - if (connection == null) { - connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf); - } - } - } + Connection connection = getConnection(); table = connection.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows, null); } } catch (InterruptedException ix) { - throw (InterruptedIOException)new InterruptedIOException().initCause(ix); + throw (InterruptedIOException) new InterruptedIOException().initCause(ix); } finally { if (table != null) { table.close(); @@ -264,6 +384,20 @@ public class ReplicationSink { } } + private Connection getConnection() throws IOException { + // See https://en.wikipedia.org/wiki/Double-checked_locking + Connection connection = sharedHtableCon; + if (connection == null) { + synchronized (sharedHtableConLock) { + connection = sharedHtableCon; + if (connection == null) { + connection = sharedHtableCon = ConnectionFactory.createConnection(conf); + } + } + } + return connection; + } + /** * Get a string representation of this sink's metrics * @return string with the total replicated edits count and the date http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3d99523..868ddee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -47,9 +46,10 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -59,8 +59,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; @@ -223,6 +227,34 @@ public class ReplicationSource extends Thread } } + @Override + public void addHFileRefs(TableName tableName, byte[] family, List<String> files) + throws ReplicationException { + String peerId = peerClusterZnode; + if (peerId.contains("-")) { + // peerClusterZnode will be in the form peerId + "-" + rsZNode. + // A peerId will not have "-" in its name, see HBASE-11394 + peerId = peerClusterZnode.split("-")[0]; + } + Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs(); + if (tableCFMap != null) { + List<String> tableCfs = tableCFMap.get(tableName); + if (tableCFMap.containsKey(tableName) + && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { + this.replicationQueues.addHFileRefs(peerId, files); + metrics.incrSizeOfHFileRefsQueue(files.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + + Bytes.toString(family) + " to peer id " + peerId); + } + } else { + // user has explicitly not defined any table cfs for replication, means replicate all the + // data + this.replicationQueues.addHFileRefs(peerId, files); + metrics.incrSizeOfHFileRefsQueue(files.size()); + } + } + private void uninitialize() { LOG.debug("Source exiting " + this.peerId); metrics.clear(); @@ -471,6 +503,8 @@ public class ReplicationSource extends Thread private int currentSize = 0; // Indicates whether this particular worker is running private boolean workerRunning = true; + // Current number of hfiles that we need to replicate + private long currentNbHFiles = 0; public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { @@ -550,6 +584,7 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; + currentNbHFiles = 0; List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1); currentSize = 0; try { @@ -701,6 +736,28 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { + String peerId = peerClusterZnode; + if (peerId.contains("-")) { + // peerClusterZnode will be in the form peerId + "-" + rsZNode. + // A peerId will not have "-" in its name, see HBASE-11394 + peerId = peerClusterZnode.split("-")[0]; + } + List<Cell> cells = edit.getCells(); + for (int i = 0; i < cells.size(); i++) { + Cell cell = cells.get(i); + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + List<StoreDescriptor> stores = bld.getStoresList(); + for (int j = 0; j < stores.size(); j++) { + List<String> storeFileList = stores.get(j).getStoreFileList(); + manager.cleanUpHFileRefs(peerId, storeFileList); + metrics.decrSizeOfHFileRefsQueue(storeFileList.size()); + } + } + } + } + /** * Poll for the next path * @return true if a path was obtained, false if not @@ -853,14 +910,31 @@ public class ReplicationSource extends Thread private int countDistinctRowKeys(WALEdit edit) { List<Cell> cells = edit.getCells(); int distinctRowKeys = 1; + int totalHFileEntries = 0; Cell lastCell = cells.get(0); + for (int i = 0; i < edit.size(); i++) { + // Count HFiles to be replicated + if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); + List<StoreDescriptor> stores = bld.getStoresList(); + for (int j = 0; j < stores.size(); j++) { + totalHFileEntries += stores.get(j).getStoreFileList().size(); + } + } catch (IOException e) { + LOG.error("Failed to deserialize bulk load entry from wal edit. " + + "This its hfiles count will not be added into metric."); + } + } + if (!CellUtil.matchingRows(cells.get(i), lastCell)) { distinctRowKeys++; } lastCell = cells.get(i); } - return distinctRowKeys; + currentNbHFiles += totalHFileEntries; + return distinctRowKeys + totalHFileEntries; } /** @@ -914,6 +988,12 @@ public class ReplicationSource extends Thread } if (this.lastLoggedPosition != this.repLogReader.getPosition()) { + //Clean up hfile references + int size = entries.size(); + for (int i = 0; i < size; i++) { + cleanUpHFileRefs(entries.get(i).getEdit()); + } + //Log and clean up WAL logs manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); @@ -925,7 +1005,7 @@ public class ReplicationSource extends Thread totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedOperations.addAndGet(currentNbOperations); // FIXME check relationship between wal group and overall - metrics.shipBatch(currentNbOperations, currentSize / 1024); + metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles); metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 1e9c714..7f4a9f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.List; import java.util.UUID; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -26,7 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -105,4 +108,14 @@ public interface ReplicationSourceInterface { */ String getStats(); + /** + * Add hfile names to the queue to be replicated. + * @param tableName Name of the table these files belongs to + * @param family Name of the family these files belong to + * @param files files whose names needs to be added to the queue to be replicated + * @throws ReplicationException If failed to add hfile references + */ + void addHFileRefs(TableName tableName, byte[] family, List<String> files) + throws ReplicationException; + }