This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new d52b098  HBASE-22380 break circle replication when doing bulkload
d52b098 is described below

commit d52b0982cca63e0e43a52fb7b7eb480c2ac9d409
Author: Wellington Chevreuil <wchevre...@apache.org>
AuthorDate: Mon Sep 23 18:15:09 2019 +0100

    HBASE-22380 break circle replication when doing bulkload
    
    Signed-off-by:  stack <st...@apache.org>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
    Signed-off-by: Norbert Kalmar <nkal...@cloudera.com>
    (cherry picked from commit 38c8bd37319325f97b1a6fe8a64c0c71683782b9)
---
 .../hadoop/hbase/client/SecureBulkLoadClient.java  |  18 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  12 +-
 .../hbase/shaded/protobuf/RequestConverter.java    |  11 +-
 .../src/main/protobuf/Client.proto                 |   1 +
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |   1 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   9 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  10 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  |  10 +-
 .../replication/regionserver/HFileReplicator.java  |   5 +-
 .../replication/regionserver/ReplicationSink.java  |  44 ++-
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |   8 +-
 .../regionserver/TestBulkLoadReplication.java      | 330 +++++++++++++++++++++
 12 files changed, 427 insertions(+), 32 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 2186271..7e3166c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -115,8 +115,8 @@ public class SecureBulkLoadClient {
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) throws IOException {
-    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, 
userToken, bulkToken,
-        false);
+    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, 
userToken,
+      bulkToken, false, null);
   }
 
   /**
@@ -133,12 +133,22 @@ public class SecureBulkLoadClient {
    * @throws IOException
    */
   public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface 
client,
+    final List<Pair<byte[], String>> familyPaths,
+    final byte[] regionName, boolean assignSeqNum,
+    final Token<?> userToken, final String bulkToken,
+    boolean copyFiles) throws IOException {
+    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, 
userToken,
+      bulkToken, false, null);
+  }
+
+  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface 
client,
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
-      final Token<?> userToken, final String bulkToken, boolean copyFiles) 
throws IOException {
+      final Token<?> userToken, final String bulkToken,
+      boolean copyFiles, List<String> clusterIds) throws IOException {
     BulkLoadHFileRequest request =
         RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, 
assignSeqNum,
-          userToken, bulkToken, copyFiles);
+          userToken, bulkToken, copyFiles, clusterIds);
 
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index e0950e6..9410b4b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2577,12 +2577,22 @@ public final class ProtobufUtil {
    * @return The WAL log marker for bulk loads.
    */
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName 
tableName,
+    ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+    Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+    return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
+      storeFilesSize, bulkloadSeqId, null);
+  }
+
+  public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName 
tableName,
       ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+      Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> 
clusterIds) {
     BulkLoadDescriptor.Builder desc =
         BulkLoadDescriptor.newBuilder()
         .setTableName(ProtobufUtil.toProtoTableName(tableName))
         
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+    if(clusterIds != null) {
+      desc.addAllClusterIds(clusterIds);
+    }
 
     for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
       WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index c60a4c6..d2c818e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -562,7 +562,7 @@ public final class RequestConverter {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
     return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, 
userToken, bulkToken,
-        false);
+        false, null);
   }
 
   /**
@@ -577,9 +577,9 @@ public final class RequestConverter {
    * @return a bulk load request
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
-      final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName, boolean assignSeqNum,
-      final Token<?> userToken, final String bulkToken, boolean copyFiles) {
+      final List<Pair<byte[], String>> familyPaths, final byte[] regionName, 
boolean assignSeqNum,
+        final Token<?> userToken, final String bulkToken, boolean copyFiles,
+          List<String> clusterIds) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -617,6 +617,9 @@ public final class RequestConverter {
       request.setBulkToken(bulkToken);
     }
     request.setCopyFile(copyFiles);
+    if (clusterIds != null) {
+      request.addAllClusterIds(clusterIds);
+    }
     return request.build();
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto 
b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 14abb08..07d8d71 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
   optional DelegationToken fs_token = 4;
   optional string bulk_token = 5;
   optional bool copy_file = 6 [default = false];
+  repeated string cluster_ids = 7;
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto 
b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 08d4741..9020daf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -144,6 +144,7 @@ message BulkLoadDescriptor {
   required bytes encoded_region_name = 2;
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
+  repeated string cluster_ids = 5;
 }
 
 /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index fe2b336..941b676 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6096,7 +6096,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], 
String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, 
null);
   }
 
   /**
@@ -6141,11 +6141,13 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
    * file about to be bulk loaded
    * @param copyFile always copy hfiles if true
+   * @param  clusterIds ids from clusters that had already handled the given 
bulkload event.
    * @return Map from family to List of store file paths if successful, null 
if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], 
String>> familyPaths,
-      boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean 
copyFile) throws IOException {
+      boolean assignSeqId, BulkLoadListener bulkLoadListener,
+        boolean copyFile, List<String> clusterIds) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6320,8 +6322,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles,
-                storeFilesSizes, seqId);
+                  storeFiles, storeFilesSizes, seqId, clusterIds);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, 
this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 87d817b..5b12430 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2355,6 +2355,12 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
       final BulkLoadHFileRequest request) throws ServiceException {
     long start = EnvironmentEdgeManager.currentTime();
+    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
+    if(clusterIds.contains(this.regionServer.clusterId)){
+      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
+    } else {
+      clusterIds.add(this.regionServer.clusterId);
+    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2387,7 +2393,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
         try {
           map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), 
null,
-              request.getCopyFile());
+              request.getCopyFile(), clusterIds);
         } finally {
           if (region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
@@ -2395,7 +2401,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
       } else {
         // secure bulk load
-        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, 
request);
+        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, 
request, clusterIds);
       }
       BulkLoadHFileResponse.Builder builder = 
BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(map != null);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 6b55744..f51608d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -212,7 +212,12 @@ public class SecureBulkLoadManager {
   }
 
   public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
-      final BulkLoadHFileRequest request) throws IOException {
+    final BulkLoadHFileRequest request) throws IOException {
+    return secureBulkLoadHFiles(region, request, null);
+  }
+
+  public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
+      final BulkLoadHFileRequest request, List<String> clusterIds) throws 
IOException {
     final List<Pair<byte[], String>> familyPaths = new 
ArrayList<>(request.getFamilyPathCount());
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : 
request.getFamilyPathList()) {
       familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -288,7 +293,8 @@ public class SecureBulkLoadManager {
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf), 
request.getCopyFile());
+                new SecureBulkLoadListener(fs, bulkToken, conf), 
request.getCopyFile(),
+              clusterIds);
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index ff54f41..9bbc16d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -87,17 +87,19 @@ public class HFileReplicator {
   private ThreadPoolExecutor exec;
   private int maxCopyThreads;
   private int copiesPerThread;
+  private List<String> sourceClusterIds;
 
   public HFileReplicator(Configuration sourceClusterConf,
       String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
       Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, 
Configuration conf,
-      Connection connection) throws IOException {
+      Connection connection, List<String> sourceClusterIds) throws IOException 
{
     this.sourceClusterConf = sourceClusterConf;
     this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
     this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
     this.bulkLoadHFileMap = tableQueueMap;
     this.conf = conf;
     this.connection = connection;
+    this.sourceClusterIds = sourceClusterIds;
 
     userProvider = UserProvider.instantiate(conf);
     fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -128,6 +130,7 @@ public class HFileReplicator {
       LoadIncrementalHFiles loadHFiles = null;
       try {
         loadHFiles = new LoadIncrementalHFiles(conf);
+        loadHFiles.setClusterIds(sourceClusterIds);
       } catch (Exception e) {
         LOG.error("Failed to initialize LoadIncrementalHFiles for replicating 
bulk loaded"
             + " data.", e);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 19f0707..6e68760 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -174,9 +174,7 @@ public class ReplicationSink {
       // invocation of this method per table and cluster id.
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
 
-      // Map of table name Vs list of pair of family and list of hfile paths 
from its namespace
-      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
-
+      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> 
bulkLoadsPerClusters = null;
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -204,10 +202,19 @@ public class ReplicationSink {
           Cell cell = cells.current();
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+            if(bulkLoadsPerClusters == null) {
+              bulkLoadsPerClusters = new HashMap<>();
+            }
+            // Map of table name Vs list of pair of family and list of
+            // hfile paths from its namespace
+            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+              bulkLoadsPerClusters.get(bld.getClusterIdsList());
             if (bulkLoadHFileMap == null) {
               bulkLoadHFileMap = new HashMap<>();
+              bulkLoadsPerClusters.put(bld.getClusterIdsList(), 
bulkLoadHFileMap);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
@@ -243,14 +250,26 @@ public class ReplicationSink {
         LOG.debug("Finished replicating mutations.");
       }
 
-      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
-        LOG.debug("Started replicating bulk loaded data.");
-        HFileReplicator hFileReplicator =
-            new HFileReplicator(this.provider.getConf(this.conf, 
replicationClusterId),
+      if(bulkLoadsPerClusters != null) {
+        for (Entry<List<String>, Map<String, List<Pair<byte[],
+          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
+          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 
entry.getValue();
+          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Started replicating bulk loaded data from cluster 
ids: {}.",
+                entry.getKey().toString());
+            }
+            HFileReplicator hFileReplicator =
+              new HFileReplicator(this.provider.getConf(this.conf, 
replicationClusterId),
                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, 
bulkLoadHFileMap, conf,
-                getConnection());
-        hFileReplicator.replicate();
-        LOG.debug("Finished replicating bulk loaded data.");
+                getConnection(), entry.getKey());
+            hFileReplicator.replicate();
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Finished replicating bulk loaded data from cluster 
id: {}",
+                entry.getKey().toString());
+            }
+          }
+        }
       }
 
       int size = entries.size();
@@ -265,8 +284,7 @@ public class ReplicationSink {
 
   private void buildBulkLoadHFileMap(
       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, 
TableName table,
-      Cell cell) throws IOException {
-    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+      BulkLoadDescriptor bld) throws IOException {
     List<StoreDescriptor> storesList = bld.getStoresList();
     int storesSize = storesList.size();
     for (int j = 0; j < storesSize; j++) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 69a3074..c5ba5a7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -155,6 +155,8 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
 
   private String bulkToken;
 
+  private List<String> clusterIds = new ArrayList<>();
+
   /**
    * Represents an HFile waiting to be loaded. An queue is used in this class 
in order to support
    * the case where a region has split during the process of the load. When 
this happens, the HFile
@@ -545,7 +547,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
           try (Table table = conn.getTable(getTableName())) {
             secureClient = new SecureBulkLoadClient(getConf(), table);
             success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, 
regionName,
-              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, 
copyFile);
+              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, 
copyFile, clusterIds);
           }
           return success ? regionName : null;
         } finally {
@@ -1260,6 +1262,10 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     this.bulkToken = stagingDir;
   }
 
+  public void setClusterIds(List<String> clusterIds) {
+    this.clusterIds = clusterIds;
+  }
+
   /**
    * Infers region boundaries for a new table.
    * <p>
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
new file mode 100644
index 0000000..4d69bdf
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for bulk load replication. Defines three clusters, with 
the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and 
active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting 
values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with 
the given replication
+ * topology all these bulk loads should get replicated only once on each peer. 
To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table 
regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets 
invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
+ */
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestBulkLoadReplication extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
+
+  protected static final Logger LOG =
+    LoggerFactory.getLogger(TestBulkLoadReplication.class);
+
+  private static final String PEER1_CLUSTER_ID = "peer1";
+  private static final String PEER4_CLUSTER_ID = "peer4";
+  private static final String PEER3_CLUSTER_ID = "peer3";
+
+  private static final String PEER_ID1 = "1";
+  private static final String PEER_ID3 = "3";
+  private static final String PEER_ID4 = "4";
+
+  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+  private static CountDownLatch BULK_LOAD_LATCH;
+
+  private static HBaseTestingUtility utility3;
+  private static HBaseTestingUtility utility4;
+  private static Configuration conf3;
+  private static Configuration conf4;
+  private static Table htable3;
+  private static Table htable4;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
+    conf3 = HBaseConfiguration.create(conf1);
+    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
+    utility3 = new HBaseTestingUtility(conf3);
+    conf4 = HBaseConfiguration.create(conf1);
+    conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
+    utility3 = new HBaseTestingUtility(conf3);
+    utility4 = new HBaseTestingUtility(conf4);
+    TestReplicationBase.setUpBeforeClass();
+    setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
+    //utility4 is started within TestReplicationBase.setUpBeforeClass(), but 
we had not set
+    //bulkload replication configs yet, so setting a 4th utility.
+    setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
+    startCluster(utility3, conf3);
+    startCluster(utility4, conf4);
+  }
+
+  private static void startCluster(HBaseTestingUtility util, Configuration 
configuration)
+      throws Exception {
+    LOG.info("Setup Zk to same one from utility1 and utility4");
+    util.setZkCluster(utility1.getZkCluster());
+    util.startMiniCluster(2);
+
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    Connection connection = ConnectionFactory.createConnection(configuration);
+    try (Admin admin = connection.getAdmin()) {
+      admin.createTable(tableDesc, 
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    }
+    util.waitUntilAllRegionsAssigned(tableName);
+  }
+
+  @Before
+  @Override
+  public void setUpBase() throws Exception {
+    super.setUpBase();
+    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
+    ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
+    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
+    //adds cluster4 as a remote peer on cluster1
+    utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
+    //adds cluster1 as a remote peer on cluster4
+    utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+    //adds cluster3 as a remote peer on cluster4
+    utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+    //adds cluster4 as a remote peer on cluster3
+    utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
+    setupCoprocessor(utility1);
+    setupCoprocessor(utility4);
+    setupCoprocessor(utility3);
+  }
+
+  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility 
util) {
+    return ReplicationPeerConfig.newBuilder()
+      .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
+  }
+
+  private void setupCoprocessor(HBaseTestingUtility cluster){
+    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
+      try {
+        r.getCoprocessorHost()
+          .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+            cluster.getConfiguration());
+      } catch (Exception e){
+        LOG.error(e.getMessage(), e);
+      }
+    });
+  }
+
+  @After
+  @Override
+  public void tearDownBase() throws Exception {
+    super.tearDownBase();
+    utility4.getAdmin().removeReplicationPeer(PEER_ID1);
+    utility4.getAdmin().removeReplicationPeer(PEER_ID3);
+    utility3.getAdmin().removeReplicationPeer(PEER_ID4);
+  }
+
+  private static void setupBulkLoadConfigsForCluster(Configuration config,
+    String clusterReplicationId) throws Exception {
+    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
+      + "/hbase-site.xml");
+    config.writeXml(new FileOutputStream(sourceConfigFile));
+    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+  }
+
+  @Test
+  public void testBulkLoadReplicationActiveActive() throws Exception {
+    Table peer1TestTable = 
utility1.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer4TestTable = 
utility4.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer3TestTable = 
utility3.getConnection().getTable(TestReplicationBase.tableName);
+    byte[] row = Bytes.toBytes("001");
+    byte[] value = Bytes.toBytes("v1");
+    assertBulkLoadConditions(row, value, utility1, peer1TestTable, 
peer4TestTable, peer3TestTable);
+    row = Bytes.toBytes("002");
+    value = Bytes.toBytes("v2");
+    assertBulkLoadConditions(row, value, utility4, peer1TestTable, 
peer4TestTable, peer3TestTable);
+    row = Bytes.toBytes("003");
+    value = Bytes.toBytes("v3");
+    assertBulkLoadConditions(row, value, utility3, peer1TestTable, 
peer4TestTable, peer3TestTable);
+    //Additional wait to make sure no extra bulk load happens
+    Thread.sleep(400);
+    //We have 3 bulk load events (1 initiated on each cluster).
+    //Each event gets 3 counts (the originator cluster, plus the two peers),
+    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
+    assertEquals(9, BULK_LOADS_COUNT.get());
+  }
+
+  private void assertBulkLoadConditions(byte[] row, byte[] value,
+      HBaseTestingUtility utility, Table...tables) throws Exception {
+    BULK_LOAD_LATCH = new CountDownLatch(3);
+    bulkLoadOnCluster(row, value, utility);
+    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+    assertTableHasValue(tables[0], row, value);
+    assertTableHasValue(tables[1], row, value);
+    assertTableHasValue(tables[2], row, value);
+  }
+
+  private void bulkLoadOnCluster(byte[] row, byte[] value,
+      HBaseTestingUtility cluster) throws Exception {
+    String bulkLoadFile = createHFileForFamilies(row, value, 
cluster.getConfiguration());
+    Path bulkLoadFilePath = new Path(bulkLoadFile);
+    copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
+    LoadIncrementalHFiles bulkLoadHFilesTool =
+      new LoadIncrementalHFiles(cluster.getConfiguration());
+    Map<byte[], List<Path>> family2Files = new HashMap<>();
+    List<Path> files = new ArrayList<>();
+    files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName()));
+    family2Files.put(Bytes.toBytes("f"), files);
+    bulkLoadHFilesTool.run(family2Files, tableName);
+  }
+
+  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) 
throws Exception {
+    Path bulkLoadDir = new Path("/bulk_dir/f");
+    cluster.getFileSystem().mkdirs(bulkLoadDir);
+    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), 
bulkLoadDir);
+  }
+
+  private void assertTableHasValue(Table table, byte[] row, byte[] value) 
throws Exception {
+    Get get = new Get(row);
+    Result result = table.get(get);
+    assertTrue(result.advance());
+    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
+  }
+
+  private String createHFileForFamilies(byte[] row, byte[] value,
+      Configuration clusterConfig) throws IOException {
+    CellBuilder cellBuilder = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+    cellBuilder.setRow(row)
+      .setFamily(TestReplicationBase.famName)
+      .setQualifier(Bytes.toBytes("1"))
+      .setValue(value)
+      .setType(Cell.Type.Put);
+
+    HFile.WriterFactory hFileFactory = 
HFile.getWriterFactoryNoCache(clusterConfig);
+    // TODO We need a way to do this without creating files
+    File hFileLocation = testFolder.newFile();
+    FSDataOutputStream out =
+      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+    try {
+      hFileFactory.withOutputStream(out);
+      hFileFactory.withFileContext(new HFileContext());
+      HFile.Writer writer = hFileFactory.create();
+      try {
+        writer.append(new KeyValue(cellBuilder.build()));
+      } finally {
+        writer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return hFileLocation.getAbsoluteFile().getAbsolutePath();
+  }
+
+  public static class BulkReplicationTestObserver implements RegionCoprocessor 
{
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(new RegionObserver() {
+        @Override
+        public void 
preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+          List<Pair<byte[], String>> familyPaths) throws IOException {
+            BULK_LOADS_COUNT.incrementAndGet();
+        }
+
+        @Override
+        public void 
postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], 
List<Path>> finalPaths)
+            throws IOException {
+          BULK_LOAD_LATCH.countDown();
+        }
+      });
+    }
+  }
+}

Reply via email to