This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 22ed8fb  Revert "HBASE-22380 break circle replication when doing 
bulkload"
22ed8fb is described below

commit 22ed8fbcacc34732a4a4a08b5216b192c5747210
Author: Wellington Chevreuil <wchevre...@apache.org>
AuthorDate: Mon Sep 23 18:06:41 2019 +0100

    Revert "HBASE-22380 break circle replication when doing bulkload"
    
    This reverts commit 11f649eefd31ba5d875a0a918313c90663a81129.
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   9 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  10 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  |  10 +-
 .../replication/regionserver/HFileReplicator.java  |   5 +-
 .../replication/regionserver/ReplicationSink.java  |  44 +--
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |   8 +-
 .../regionserver/TestBulkLoadReplication.java      | 330 ---------------------
 7 files changed, 23 insertions(+), 393 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e95e13e..f3b5a90 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6046,7 +6046,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], 
String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, 
null);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
   }
 
   /**
@@ -6091,13 +6091,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
    * file about to be bulk loaded
    * @param copyFile always copy hfiles if true
-   * @param  clusterIds ids from clusters that had already handled the given 
bulkload event.
    * @return Map from family to List of store file paths if successful, null 
if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], 
String>> familyPaths,
-      boolean assignSeqId, BulkLoadListener bulkLoadListener,
-        boolean copyFile, List<String> clusterIds) throws IOException {
+      boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean 
copyFile) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6272,7 +6270,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds);
+                  storeFiles,
+                storeFilesSizes, seqId);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, 
this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index aa54876..2b3bec7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2305,12 +2305,6 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
       final BulkLoadHFileRequest request) throws ServiceException {
     long start = EnvironmentEdgeManager.currentTime();
-    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
-    if(clusterIds.contains(this.regionServer.clusterId)){
-      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
-    } else {
-      clusterIds.add(this.regionServer.clusterId);
-    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2343,7 +2337,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
         try {
           map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), 
null,
-              request.getCopyFile(), clusterIds);
+              request.getCopyFile());
         } finally {
           if (region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
@@ -2351,7 +2345,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
       } else {
         // secure bulk load
-        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, 
request, clusterIds);
+        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, 
request);
       }
       BulkLoadHFileResponse.Builder builder = 
BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(map != null);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index f51608d..6b55744 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -212,12 +212,7 @@ public class SecureBulkLoadManager {
   }
 
   public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
-    final BulkLoadHFileRequest request) throws IOException {
-    return secureBulkLoadHFiles(region, request, null);
-  }
-
-  public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
-      final BulkLoadHFileRequest request, List<String> clusterIds) throws 
IOException {
+      final BulkLoadHFileRequest request) throws IOException {
     final List<Pair<byte[], String>> familyPaths = new 
ArrayList<>(request.getFamilyPathCount());
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : 
request.getFamilyPathList()) {
       familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -293,8 +288,7 @@ public class SecureBulkLoadManager {
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf), 
request.getCopyFile(),
-              clusterIds);
+                new SecureBulkLoadListener(fs, bulkToken, conf), 
request.getCopyFile());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index c7fed77..ab9a236 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -87,19 +87,17 @@ public class HFileReplicator {
   private ThreadPoolExecutor exec;
   private int maxCopyThreads;
   private int copiesPerThread;
-  private List<String> sourceClusterIds;
 
   public HFileReplicator(Configuration sourceClusterConf,
       String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
       Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, 
Configuration conf,
-      Connection connection, List<String> sourceClusterIds) throws IOException 
{
+      Connection connection) throws IOException {
     this.sourceClusterConf = sourceClusterConf;
     this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
     this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
     this.bulkLoadHFileMap = tableQueueMap;
     this.conf = conf;
     this.connection = connection;
-    this.sourceClusterIds = sourceClusterIds;
 
     userProvider = UserProvider.instantiate(conf);
     fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -130,7 +128,6 @@ public class HFileReplicator {
       LoadIncrementalHFiles loadHFiles = null;
       try {
         loadHFiles = new LoadIncrementalHFiles(conf);
-        loadHFiles.setClusterIds(sourceClusterIds);
       } catch (Exception e) {
         LOG.error("Failed to initialize LoadIncrementalHFiles for replicating 
bulk loaded"
             + " data.", e);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 8079adc..fb4e0f9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -174,7 +174,9 @@ public class ReplicationSink {
       // invocation of this method per table and cluster id.
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
 
-      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> 
bulkLoadsPerClusters = null;
+      // Map of table name Vs list of pair of family and list of hfile paths 
from its namespace
+      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
+
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -202,19 +204,10 @@ public class ReplicationSink {
           Cell cell = cells.current();
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-            if(bulkLoadsPerClusters == null) {
-              bulkLoadsPerClusters = new HashMap<>();
-            }
-            // Map of table name Vs list of pair of family and list of
-            // hfile paths from its namespace
-            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
-              bulkLoadsPerClusters.get(bld.getClusterIdsList());
             if (bulkLoadHFileMap == null) {
               bulkLoadHFileMap = new HashMap<>();
-              bulkLoadsPerClusters.put(bld.getClusterIdsList(), 
bulkLoadHFileMap);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
@@ -250,26 +243,14 @@ public class ReplicationSink {
         LOG.debug("Finished replicating mutations.");
       }
 
-      if(bulkLoadsPerClusters != null) {
-        for (Entry<List<String>, Map<String, List<Pair<byte[],
-          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
-          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 
entry.getValue();
-          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Started replicating bulk loaded data from cluster 
ids: {}.",
-                entry.getKey().toString());
-            }
-            HFileReplicator hFileReplicator =
-              new HFileReplicator(this.provider.getConf(this.conf, 
replicationClusterId),
+      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+        LOG.debug("Started replicating bulk loaded data.");
+        HFileReplicator hFileReplicator =
+            new HFileReplicator(this.provider.getConf(this.conf, 
replicationClusterId),
                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, 
bulkLoadHFileMap, conf,
-                getConnection(), entry.getKey());
-            hFileReplicator.replicate();
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Finished replicating bulk loaded data from cluster 
id: {}",
-                entry.getKey().toString());
-            }
-          }
-        }
+                getConnection());
+        hFileReplicator.replicate();
+        LOG.debug("Finished replicating bulk loaded data.");
       }
 
       int size = entries.size();
@@ -284,7 +265,8 @@ public class ReplicationSink {
 
   private void buildBulkLoadHFileMap(
       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, 
TableName table,
-      BulkLoadDescriptor bld) throws IOException {
+      Cell cell) throws IOException {
+    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
     List<StoreDescriptor> storesList = bld.getStoresList();
     int storesSize = storesList.size();
     for (int j = 0; j < storesSize; j++) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 950f2a5..e4d5dcb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -137,8 +137,6 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
 
   private String bulkToken;
 
-  private List<String> clusterIds = new ArrayList<>();
-
   /**
    * Represents an HFile waiting to be loaded. An queue is used in this class 
in order to support
    * the case where a region has split during the process of the load. When 
this happens, the HFile
@@ -541,7 +539,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
           try (Table table = conn.getTable(getTableName())) {
             secureClient = new SecureBulkLoadClient(getConf(), table);
             success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, 
regionName,
-              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, 
copyFile, clusterIds);
+              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, 
copyFile);
           }
           return success ? regionName : null;
         } finally {
@@ -1253,10 +1251,6 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     this.bulkToken = stagingDir;
   }
 
-  public void setClusterIds(List<String> clusterIds) {
-    this.clusterIds = clusterIds;
-  }
-
   /**
    * Infers region boundaries for a new table.
    * <p>
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
deleted file mode 100644
index 4d69bdf..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellBuilder;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.TestReplicationBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Integration test for bulk load replication. Defines three clusters, with 
the following
- * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and 
active-active between
- * 2 and 3).
- *
- * For each of defined test clusters, it performs a bulk load, asserting 
values on bulk loaded file
- * gets replicated to other two peers. Since we are doing 3 bulk loads, with 
the given replication
- * topology all these bulk loads should get replicated only once on each peer. 
To assert this,
- * this test defines a preBulkLoad coprocessor and adds it to all test table 
regions, on each of the
- * clusters. This CP counts the amount of times bulk load actually gets 
invoked, certifying
- * we are not entering the infinite loop condition addressed by HBASE-22380.
- */
-@Category({ ReplicationTests.class, MediumTests.class})
-public class TestBulkLoadReplication extends TestReplicationBase {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
-
-  protected static final Logger LOG =
-    LoggerFactory.getLogger(TestBulkLoadReplication.class);
-
-  private static final String PEER1_CLUSTER_ID = "peer1";
-  private static final String PEER4_CLUSTER_ID = "peer4";
-  private static final String PEER3_CLUSTER_ID = "peer3";
-
-  private static final String PEER_ID1 = "1";
-  private static final String PEER_ID3 = "3";
-  private static final String PEER_ID4 = "4";
-
-  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
-  private static CountDownLatch BULK_LOAD_LATCH;
-
-  private static HBaseTestingUtility utility3;
-  private static HBaseTestingUtility utility4;
-  private static Configuration conf3;
-  private static Configuration conf4;
-  private static Table htable3;
-  private static Table htable4;
-
-  @Rule
-  public TestName name = new TestName();
-
-  @ClassRule
-  public static TemporaryFolder testFolder = new TemporaryFolder();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
-    conf3 = HBaseConfiguration.create(conf1);
-    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
-    utility3 = new HBaseTestingUtility(conf3);
-    conf4 = HBaseConfiguration.create(conf1);
-    conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
-    utility3 = new HBaseTestingUtility(conf3);
-    utility4 = new HBaseTestingUtility(conf4);
-    TestReplicationBase.setUpBeforeClass();
-    setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
-    //utility4 is started within TestReplicationBase.setUpBeforeClass(), but 
we had not set
-    //bulkload replication configs yet, so setting a 4th utility.
-    setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
-    startCluster(utility3, conf3);
-    startCluster(utility4, conf4);
-  }
-
-  private static void startCluster(HBaseTestingUtility util, Configuration 
configuration)
-      throws Exception {
-    LOG.info("Setup Zk to same one from utility1 and utility4");
-    util.setZkCluster(utility1.getZkCluster());
-    util.startMiniCluster(2);
-
-    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
-      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
-        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
-
-    Connection connection = ConnectionFactory.createConnection(configuration);
-    try (Admin admin = connection.getAdmin()) {
-      admin.createTable(tableDesc, 
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-    }
-    util.waitUntilAllRegionsAssigned(tableName);
-  }
-
-  @Before
-  @Override
-  public void setUpBase() throws Exception {
-    super.setUpBase();
-    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
-    ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
-    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
-    //adds cluster4 as a remote peer on cluster1
-    utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
-    //adds cluster1 as a remote peer on cluster4
-    utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
-    //adds cluster3 as a remote peer on cluster4
-    utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
-    //adds cluster4 as a remote peer on cluster3
-    utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
-    setupCoprocessor(utility1);
-    setupCoprocessor(utility4);
-    setupCoprocessor(utility3);
-  }
-
-  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility 
util) {
-    return ReplicationPeerConfig.newBuilder()
-      .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
-  }
-
-  private void setupCoprocessor(HBaseTestingUtility cluster){
-    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
-      try {
-        r.getCoprocessorHost()
-          .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
-            cluster.getConfiguration());
-      } catch (Exception e){
-        LOG.error(e.getMessage(), e);
-      }
-    });
-  }
-
-  @After
-  @Override
-  public void tearDownBase() throws Exception {
-    super.tearDownBase();
-    utility4.getAdmin().removeReplicationPeer(PEER_ID1);
-    utility4.getAdmin().removeReplicationPeer(PEER_ID3);
-    utility3.getAdmin().removeReplicationPeer(PEER_ID4);
-  }
-
-  private static void setupBulkLoadConfigsForCluster(Configuration config,
-    String clusterReplicationId) throws Exception {
-    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
-    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
-    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
-    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
-      + "/hbase-site.xml");
-    config.writeXml(new FileOutputStream(sourceConfigFile));
-    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
-  }
-
-  @Test
-  public void testBulkLoadReplicationActiveActive() throws Exception {
-    Table peer1TestTable = 
utility1.getConnection().getTable(TestReplicationBase.tableName);
-    Table peer4TestTable = 
utility4.getConnection().getTable(TestReplicationBase.tableName);
-    Table peer3TestTable = 
utility3.getConnection().getTable(TestReplicationBase.tableName);
-    byte[] row = Bytes.toBytes("001");
-    byte[] value = Bytes.toBytes("v1");
-    assertBulkLoadConditions(row, value, utility1, peer1TestTable, 
peer4TestTable, peer3TestTable);
-    row = Bytes.toBytes("002");
-    value = Bytes.toBytes("v2");
-    assertBulkLoadConditions(row, value, utility4, peer1TestTable, 
peer4TestTable, peer3TestTable);
-    row = Bytes.toBytes("003");
-    value = Bytes.toBytes("v3");
-    assertBulkLoadConditions(row, value, utility3, peer1TestTable, 
peer4TestTable, peer3TestTable);
-    //Additional wait to make sure no extra bulk load happens
-    Thread.sleep(400);
-    //We have 3 bulk load events (1 initiated on each cluster).
-    //Each event gets 3 counts (the originator cluster, plus the two peers),
-    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
-    assertEquals(9, BULK_LOADS_COUNT.get());
-  }
-
-  private void assertBulkLoadConditions(byte[] row, byte[] value,
-      HBaseTestingUtility utility, Table...tables) throws Exception {
-    BULK_LOAD_LATCH = new CountDownLatch(3);
-    bulkLoadOnCluster(row, value, utility);
-    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
-    assertTableHasValue(tables[0], row, value);
-    assertTableHasValue(tables[1], row, value);
-    assertTableHasValue(tables[2], row, value);
-  }
-
-  private void bulkLoadOnCluster(byte[] row, byte[] value,
-      HBaseTestingUtility cluster) throws Exception {
-    String bulkLoadFile = createHFileForFamilies(row, value, 
cluster.getConfiguration());
-    Path bulkLoadFilePath = new Path(bulkLoadFile);
-    copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
-    LoadIncrementalHFiles bulkLoadHFilesTool =
-      new LoadIncrementalHFiles(cluster.getConfiguration());
-    Map<byte[], List<Path>> family2Files = new HashMap<>();
-    List<Path> files = new ArrayList<>();
-    files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName()));
-    family2Files.put(Bytes.toBytes("f"), files);
-    bulkLoadHFilesTool.run(family2Files, tableName);
-  }
-
-  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) 
throws Exception {
-    Path bulkLoadDir = new Path("/bulk_dir/f");
-    cluster.getFileSystem().mkdirs(bulkLoadDir);
-    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), 
bulkLoadDir);
-  }
-
-  private void assertTableHasValue(Table table, byte[] row, byte[] value) 
throws Exception {
-    Get get = new Get(row);
-    Result result = table.get(get);
-    assertTrue(result.advance());
-    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
-  }
-
-  private String createHFileForFamilies(byte[] row, byte[] value,
-      Configuration clusterConfig) throws IOException {
-    CellBuilder cellBuilder = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
-    cellBuilder.setRow(row)
-      .setFamily(TestReplicationBase.famName)
-      .setQualifier(Bytes.toBytes("1"))
-      .setValue(value)
-      .setType(Cell.Type.Put);
-
-    HFile.WriterFactory hFileFactory = 
HFile.getWriterFactoryNoCache(clusterConfig);
-    // TODO We need a way to do this without creating files
-    File hFileLocation = testFolder.newFile();
-    FSDataOutputStream out =
-      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
-    try {
-      hFileFactory.withOutputStream(out);
-      hFileFactory.withFileContext(new HFileContext());
-      HFile.Writer writer = hFileFactory.create();
-      try {
-        writer.append(new KeyValue(cellBuilder.build()));
-      } finally {
-        writer.close();
-      }
-    } finally {
-      out.close();
-    }
-    return hFileLocation.getAbsoluteFile().getAbsolutePath();
-  }
-
-  public static class BulkReplicationTestObserver implements RegionCoprocessor 
{
-
-    @Override
-    public Optional<RegionObserver> getRegionObserver() {
-      return Optional.of(new RegionObserver() {
-        @Override
-        public void 
preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
-          List<Pair<byte[], String>> familyPaths) throws IOException {
-            BULK_LOADS_COUNT.incrementAndGet();
-        }
-
-        @Override
-        public void 
postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
-          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], 
List<Path>> finalPaths)
-            throws IOException {
-          BULK_LOAD_LATCH.countDown();
-        }
-      });
-    }
-  }
-}

Reply via email to