This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 11f649e HBASE-22380 break circle replication when doing bulkload 11f649e is described below commit 11f649eefd31ba5d875a0a918313c90663a81129 Author: Wellington Chevreuil <wchevre...@apache.org> AuthorDate: Mon Sep 23 17:16:15 2019 +0100 HBASE-22380 break circle replication when doing bulkload Signed-off-by: stack <st...@apache.org> Signed-off-by: Andrew Purtell <apurt...@apache.org> Signed-off-by: Norbert Kalmar <nkal...@cloudera.com> (cherry picked from commit 38c8bd37319325f97b1a6fe8a64c0c71683782b9) --- .../apache/hadoop/hbase/regionserver/HRegion.java | 9 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 10 +- .../hbase/regionserver/SecureBulkLoadManager.java | 10 +- .../replication/regionserver/HFileReplicator.java | 5 +- .../replication/regionserver/ReplicationSink.java | 44 ++- .../hadoop/hbase/tool/LoadIncrementalHFiles.java | 8 +- .../regionserver/TestBulkLoadReplication.java | 330 +++++++++++++++++++++ 7 files changed, 393 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f3b5a90..e95e13e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6046,7 +6046,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); } /** @@ -6091,11 +6091,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param copyFile always copy hfiles if true + * @param clusterIds ids from clusters that had already handled the given bulkload event. * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, - boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { + boolean assignSeqId, BulkLoadListener bulkLoadListener, + boolean copyFile, List<String> clusterIds) throws IOException { long seqId = -1; Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map<String, Long> storeFilesSizes = new HashMap<>(); @@ -6270,8 +6272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, - storeFilesSizes, seqId); + storeFiles, storeFilesSizes, seqId, clusterIds); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2b3bec7..aa54876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2305,6 +2305,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); + List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); + if(clusterIds.contains(this.regionServer.clusterId)){ + return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); + } else { + clusterIds.add(this.regionServer.clusterId); + } try { checkOpen(); requestCount.increment(); @@ -2337,7 +2343,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } try { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); + request.getCopyFile(), clusterIds); } finally { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); @@ -2345,7 +2351,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else { // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); + map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 6b55744..f51608d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -212,7 +212,12 @@ public class SecureBulkLoadManager { } public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region, - final BulkLoadHFileRequest request) throws IOException { + final BulkLoadHFileRequest request) throws IOException { + return secureBulkLoadHFiles(region, request, null); + } + + public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region, + final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException { final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); @@ -288,7 +293,8 @@ public class SecureBulkLoadManager { //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), + clusterIds); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index ab9a236..c7fed77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -87,17 +87,19 @@ public class HFileReplicator { private ThreadPoolExecutor exec; private int maxCopyThreads; private int copiesPerThread; + private List<String> sourceClusterIds; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, - Connection connection) throws IOException { + Connection connection, List<String> sourceClusterIds) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; this.bulkLoadHFileMap = tableQueueMap; this.conf = conf; this.connection = connection; + this.sourceClusterIds = sourceClusterIds; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -128,6 +130,7 @@ public class HFileReplicator { LoadIncrementalHFiles loadHFiles = null; try { loadHFiles = new LoadIncrementalHFiles(conf); + loadHFiles.setClusterIds(sourceClusterIds); } catch (Exception e) { LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" + " data.", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index fb4e0f9..8079adc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -174,9 +174,7 @@ public class ReplicationSink { // invocation of this method per table and cluster id. Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); - // Map of table name Vs list of pair of family and list of hfile paths from its namespace - Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null; - + Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -204,10 +202,19 @@ public class ReplicationSink { Cell cell = cells.current(); // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + if(bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); if (bulkLoadHFileMap == null) { bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -243,14 +250,26 @@ public class ReplicationSink { LOG.debug("Finished replicating mutations."); } - if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { - LOG.debug("Started replicating bulk loaded data."); - HFileReplicator hFileReplicator = - new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), + if(bulkLoadsPerClusters != null) { + for (Entry<List<String>, Map<String, List<Pair<byte[], + List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { + Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); + if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { + if(LOG.isDebugEnabled()) { + LOG.debug("Started replicating bulk loaded data from cluster ids: {}.", + entry.getKey().toString()); + } + HFileReplicator hFileReplicator = + new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection()); - hFileReplicator.replicate(); - LOG.debug("Finished replicating bulk loaded data."); + getConnection(), entry.getKey()); + hFileReplicator.replicate(); + if(LOG.isDebugEnabled()) { + LOG.debug("Finished replicating bulk loaded data from cluster id: {}", + entry.getKey().toString()); + } + } + } } int size = entries.size(); @@ -265,8 +284,7 @@ public class ReplicationSink { private void buildBulkLoadHFileMap( final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, - Cell cell) throws IOException { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + BulkLoadDescriptor bld) throws IOException { List<StoreDescriptor> storesList = bld.getStoresList(); int storesSize = storesList.size(); for (int j = 0; j < storesSize; j++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index e4d5dcb..950f2a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -137,6 +137,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; + private List<String> clusterIds = new ArrayList<>(); + /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support * the case where a region has split during the process of the load. When this happens, the HFile @@ -539,7 +541,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds); } return success ? regionName : null; } finally { @@ -1251,6 +1253,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.bulkToken = stagingDir; } + public void setClusterIds(List<String> clusterIds) { + this.clusterIds = clusterIds; + } + /** * Infers region boundaries for a new table. * <p> diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java new file mode 100644 index 0000000..4d69bdf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for bulk load replication. Defines three clusters, with the following + * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between + * 2 and 3). + * + * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file + * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication + * topology all these bulk loads should get replicated only once on each peer. To assert this, + * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the + * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying + * we are not entering the infinite loop condition addressed by HBASE-22380. + */ +@Category({ ReplicationTests.class, MediumTests.class}) +public class TestBulkLoadReplication extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadReplication.class); + + protected static final Logger LOG = + LoggerFactory.getLogger(TestBulkLoadReplication.class); + + private static final String PEER1_CLUSTER_ID = "peer1"; + private static final String PEER4_CLUSTER_ID = "peer4"; + private static final String PEER3_CLUSTER_ID = "peer3"; + + private static final String PEER_ID1 = "1"; + private static final String PEER_ID3 = "3"; + private static final String PEER_ID4 = "4"; + + private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static CountDownLatch BULK_LOAD_LATCH; + + private static HBaseTestingUtility utility3; + private static HBaseTestingUtility utility4; + private static Configuration conf3; + private static Configuration conf4; + private static Table htable3; + private static Table htable4; + + @Rule + public TestName name = new TestName(); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID); + conf3 = HBaseConfiguration.create(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + utility3 = new HBaseTestingUtility(conf3); + conf4 = HBaseConfiguration.create(conf1); + conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4"); + utility3 = new HBaseTestingUtility(conf3); + utility4 = new HBaseTestingUtility(conf4); + TestReplicationBase.setUpBeforeClass(); + setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID); + //utility4 is started within TestReplicationBase.setUpBeforeClass(), but we had not set + //bulkload replication configs yet, so setting a 4th utility. + setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID); + startCluster(utility3, conf3); + startCluster(utility4, conf4); + } + + private static void startCluster(HBaseTestingUtility util, Configuration configuration) + throws Exception { + LOG.info("Setup Zk to same one from utility1 and utility4"); + util.setZkCluster(utility1.getZkCluster()); + util.startMiniCluster(2); + + TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + Connection connection = ConnectionFactory.createConnection(configuration); + try (Admin admin = connection.getAdmin()) { + admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + util.waitUntilAllRegionsAssigned(tableName); + } + + @Before + @Override + public void setUpBase() throws Exception { + super.setUpBase(); + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1); + ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4); + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3); + //adds cluster4 as a remote peer on cluster1 + utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config); + //adds cluster1 as a remote peer on cluster4 + utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); + //adds cluster3 as a remote peer on cluster4 + utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); + //adds cluster4 as a remote peer on cluster3 + utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config); + setupCoprocessor(utility1); + setupCoprocessor(utility4); + setupCoprocessor(utility3); + } + + private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { + return ReplicationPeerConfig.newBuilder() + .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); + } + + private void setupCoprocessor(HBaseTestingUtility cluster){ + cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { + try { + r.getCoprocessorHost() + .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, + cluster.getConfiguration()); + } catch (Exception e){ + LOG.error(e.getMessage(), e); + } + }); + } + + @After + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + utility4.getAdmin().removeReplicationPeer(PEER_ID1); + utility4.getAdmin().removeReplicationPeer(PEER_ID3); + utility3.getAdmin().removeReplicationPeer(PEER_ID4); + } + + private static void setupBulkLoadConfigsForCluster(Configuration config, + String clusterReplicationId) throws Exception { + config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); + File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); + File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + + "/hbase-site.xml"); + config.writeXml(new FileOutputStream(sourceConfigFile)); + config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); + } + + @Test + public void testBulkLoadReplicationActiveActive() throws Exception { + Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName); + Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName); + byte[] row = Bytes.toBytes("001"); + byte[] value = Bytes.toBytes("v1"); + assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable); + row = Bytes.toBytes("002"); + value = Bytes.toBytes("v2"); + assertBulkLoadConditions(row, value, utility4, peer1TestTable, peer4TestTable, peer3TestTable); + row = Bytes.toBytes("003"); + value = Bytes.toBytes("v3"); + assertBulkLoadConditions(row, value, utility3, peer1TestTable, peer4TestTable, peer3TestTable); + //Additional wait to make sure no extra bulk load happens + Thread.sleep(400); + //We have 3 bulk load events (1 initiated on each cluster). + //Each event gets 3 counts (the originator cluster, plus the two peers), + //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. + assertEquals(9, BULK_LOADS_COUNT.get()); + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, + HBaseTestingUtility utility, Table...tables) throws Exception { + BULK_LOAD_LATCH = new CountDownLatch(3); + bulkLoadOnCluster(row, value, utility); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); + assertTableHasValue(tables[0], row, value); + assertTableHasValue(tables[1], row, value); + assertTableHasValue(tables[2], row, value); + } + + private void bulkLoadOnCluster(byte[] row, byte[] value, + HBaseTestingUtility cluster) throws Exception { + String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration()); + Path bulkLoadFilePath = new Path(bulkLoadFile); + copyToHdfs(bulkLoadFile, cluster.getDFSCluster()); + LoadIncrementalHFiles bulkLoadHFilesTool = + new LoadIncrementalHFiles(cluster.getConfiguration()); + Map<byte[], List<Path>> family2Files = new HashMap<>(); + List<Path> files = new ArrayList<>(); + files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName())); + family2Files.put(Bytes.toBytes("f"), files); + bulkLoadHFilesTool.run(family2Files, tableName); + } + + private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { + Path bulkLoadDir = new Path("/bulk_dir/f"); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { + Get get = new Get(row); + Result result = table.get(get); + assertTrue(result.advance()); + assertEquals(Bytes.toString(value), Bytes.toString(result.value())); + } + + private String createHFileForFamilies(byte[] row, byte[] value, + Configuration clusterConfig) throws IOException { + CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(row) + .setFamily(TestReplicationBase.famName) + .setQualifier(Bytes.toBytes("1")) + .setValue(value) + .setType(Cell.Type.Put); + + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); + // TODO We need a way to do this without creating files + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = + new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(cellBuilder.build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + public static class BulkReplicationTestObserver implements RegionCoprocessor { + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(new RegionObserver() { + @Override + public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, + List<Pair<byte[], String>> familyPaths) throws IOException { + BULK_LOADS_COUNT.incrementAndGet(); + } + + @Override + public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, + List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) + throws IOException { + BULK_LOAD_LATCH.countDown(); + } + }); + } + } +}