Repository: hbase
Updated Branches:
  refs/heads/branch-1 b9df7978f -> 0964884b9


HBASE-15669 HFile size is not considered correctly in a replication request


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0964884b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0964884b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0964884b

Branch: refs/heads/branch-1
Commit: 0964884b925f251725bcd101f23f77a5d3d829e1
Parents: b9df797
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Fri May 6 17:28:06 2016 +0530
Committer: Ashish Singhi <ashishsin...@apache.org>
Committed: Fri May 6 17:28:06 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  12 +-
 .../hbase/protobuf/generated/WALProtos.java     | 159 ++++++++++++++++---
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../hadoop/hbase/regionserver/HRegion.java      |  18 ++-
 .../regionserver/ReplicationSource.java         |  44 ++++-
 .../regionserver/TestReplicationSink.java       |   4 +-
 .../TestReplicationSourceManager.java           |  25 ++-
 7 files changed, 229 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 82f5f0d..08cf6fa 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -3128,13 +3128,16 @@ public final class ProtobufUtil {
    * @param tableName         The tableName into which the bulk load is being 
imported into.
    * @param encodedRegionName Encoded region name of the region which is being 
bulk loaded.
    * @param storeFiles        A set of store files of a column family are bulk 
loaded.
+   * @param storeFilesSize  Map of store files and their lengths
    * @param bulkloadSeqId     sequence ID (by a force flush) used to create 
bulk load hfile
    *                          name
    * @return The WAL log marker for bulk loads.
    */
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName 
tableName,
-      ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles, long 
bulkloadSeqId) {
-    BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder()
+      ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+      Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+    BulkLoadDescriptor.Builder desc =
+        BulkLoadDescriptor.newBuilder()
         .setTableName(ProtobufUtil.toProtoTableName(tableName))
         
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
 
@@ -3143,7 +3146,10 @@ public final class ProtobufUtil {
           .setFamilyName(ByteStringer.wrap(entry.getKey()))
           .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to 
region
       for (Path path : entry.getValue()) {
-        builder.addStoreFile(path.getName());
+        String name = path.getName();
+        builder.addStoreFile(name);
+        Long size = storeFilesSize.get(name) == null ? (Long) 0L : 
storeFilesSize.get(name);
+        builder.setStoreFileSize(size);
       }
       desc.addStores(builder);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index d74688e..6252d51 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -7821,6 +7821,24 @@ public final class WALProtos {
      */
     com.google.protobuf.ByteString
         getStoreFileBytes(int index);
+
+    // optional uint64 store_file_size = 4;
+    /**
+     * <code>optional uint64 store_file_size = 4;</code>
+     *
+     * <pre>
+     * size of store file
+     * </pre>
+     */
+    boolean hasStoreFileSize();
+    /**
+     * <code>optional uint64 store_file_size = 4;</code>
+     *
+     * <pre>
+     * size of store file
+     * </pre>
+     */
+    long getStoreFileSize();
   }
   /**
    * Protobuf type {@code hbase.pb.StoreDescriptor}
@@ -7891,6 +7909,11 @@ public final class WALProtos {
               storeFile_.add(input.readBytes());
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000004;
+              storeFileSize_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8051,10 +8074,35 @@ public final class WALProtos {
       return storeFile_.getByteString(index);
     }
 
+    // optional uint64 store_file_size = 4;
+    public static final int STORE_FILE_SIZE_FIELD_NUMBER = 4;
+    private long storeFileSize_;
+    /**
+     * <code>optional uint64 store_file_size = 4;</code>
+     *
+     * <pre>
+     * size of store file
+     * </pre>
+     */
+    public boolean hasStoreFileSize() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional uint64 store_file_size = 4;</code>
+     *
+     * <pre>
+     * size of store file
+     * </pre>
+     */
+    public long getStoreFileSize() {
+      return storeFileSize_;
+    }
+
     private void initFields() {
       familyName_ = com.google.protobuf.ByteString.EMPTY;
       storeHomeDir_ = "";
       storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      storeFileSize_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8085,6 +8133,9 @@ public final class WALProtos {
       for (int i = 0; i < storeFile_.size(); i++) {
         output.writeBytes(3, storeFile_.getByteString(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt64(4, storeFileSize_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -8111,6 +8162,10 @@ public final class WALProtos {
         size += dataSize;
         size += 1 * getStoreFileList().size();
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(4, storeFileSize_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -8146,6 +8201,11 @@ public final class WALProtos {
       }
       result = result && getStoreFileList()
           .equals(other.getStoreFileList());
+      result = result && (hasStoreFileSize() == other.hasStoreFileSize());
+      if (hasStoreFileSize()) {
+        result = result && (getStoreFileSize()
+            == other.getStoreFileSize());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8171,6 +8231,10 @@ public final class WALProtos {
         hash = (37 * hash) + STORE_FILE_FIELD_NUMBER;
         hash = (53 * hash) + getStoreFileList().hashCode();
       }
+      if (hasStoreFileSize()) {
+        hash = (37 * hash) + STORE_FILE_SIZE_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getStoreFileSize());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -8286,6 +8350,8 @@ public final class WALProtos {
         bitField0_ = (bitField0_ & ~0x00000002);
         storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000004);
+        storeFileSize_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -8328,6 +8394,10 @@ public final class WALProtos {
           bitField0_ = (bitField0_ & ~0x00000004);
         }
         result.storeFile_ = storeFile_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.storeFileSize_ = storeFileSize_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -8362,6 +8432,9 @@ public final class WALProtos {
           }
           onChanged();
         }
+        if (other.hasStoreFileSize()) {
+          setStoreFileSize(other.getStoreFileSize());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8660,6 +8733,55 @@ public final class WALProtos {
         return this;
       }
 
+      // optional uint64 store_file_size = 4;
+      private long storeFileSize_ ;
+      /**
+       * <code>optional uint64 store_file_size = 4;</code>
+       *
+       * <pre>
+       * size of store file
+       * </pre>
+       */
+      public boolean hasStoreFileSize() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional uint64 store_file_size = 4;</code>
+       *
+       * <pre>
+       * size of store file
+       * </pre>
+       */
+      public long getStoreFileSize() {
+        return storeFileSize_;
+      }
+      /**
+       * <code>optional uint64 store_file_size = 4;</code>
+       *
+       * <pre>
+       * size of store file
+       * </pre>
+       */
+      public Builder setStoreFileSize(long value) {
+        bitField0_ |= 0x00000008;
+        storeFileSize_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 store_file_size = 4;</code>
+       *
+       * <pre>
+       * size of store file
+       * </pre>
+       */
+      public Builder clearStoreFileSize() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        storeFileSize_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.StoreDescriptor)
     }
 
@@ -11877,24 +11999,25 @@ public final class WALProtos {
       "ome_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 
\003(\t\"S\n\013F" +
       "lushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FL" +
       
"USH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020\n\014CANNOT_FLUSH\020\003",
-      "\"R\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" +
+      "\"k\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" +
       "\022\026\n\016store_home_dir\030\002 
\002(\t\022\022\n\nstore_file\030\003" +
-      " \003(\t\"\237\001\n\022BulkLoadDescriptor\022\'\n\ntable_nam" +
-      "e\030\001 \002(\0132\023.hbase.pb.TableName\022\033\n\023encoded_" +
-      "region_name\030\002 \002(\014\022)\n\006stores\030\003 
\003(\0132\031.hbas" +
-      "e.pb.StoreDescriptor\022\030\n\020bulkload_seq_num" +
-      "\030\004 \002(\003\"\272\002\n\025RegionEventDescriptor\022=\n\neven" +
-      "t_type\030\001 \002(\0162).hbase.pb.RegionEventDescr" +
-      "iptor.EventType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023e" 
+
-      "ncoded_region_name\030\003 \002(\014\022\033\n\023log_sequence",
-      "_number\030\004 \001(\004\022)\n\006stores\030\005 
\003(\0132\031.hbase.pb" +
-      ".StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.hbase" +
-      ".pb.ServerName\022\023\n\013region_name\030\007 \001(\014\".\n\tE" +
-      "ventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOS" +
-      "E\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
-      "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
-      "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" +
-      "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      " \003(\t\022\027\n\017store_file_size\030\004 
\001(\004\"\237\001\n\022BulkLo" +
+      "adDescriptor\022\'\n\ntable_name\030\001 \002(\0132\023.hbase" +
+      ".pb.TableName\022\033\n\023encoded_region_name\030\002 \002" +
+      "(\014\022)\n\006stores\030\003 \003(\0132\031.hbase.pb.StoreDescr" +
+      "iptor\022\030\n\020bulkload_seq_num\030\004 
\002(\003\"\272\002\n\025Regi" +
+      "onEventDescriptor\022=\n\nevent_type\030\001 \002(\0162)." +
+      "hbase.pb.RegionEventDescriptor.EventType" +
+      "\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_n",
+      "ame\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 
\001(\004\022)" +
+      "\n\006stores\030\005 \003(\0132\031.hbase.pb.StoreDescripto" +
+      "r\022$\n\006server\030\006 \001(\0132\024.hbase.pb.ServerName\022" +
+      "\023\n\013region_name\030\007 
\001(\014\".\n\tEventType\022\017\n\013REG" +
+      "ION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrail" +
+      "er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" +
+      "AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" +
+      "g.apache.hadoop.hbase.protobuf.generated" +
+      "B\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -11942,7 +12065,7 @@ public final class WALProtos {
           internal_static_hbase_pb_StoreDescriptor_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_StoreDescriptor_descriptor,
-              new java.lang.String[] { "FamilyName", "StoreHomeDir", 
"StoreFile", });
+              new java.lang.String[] { "FamilyName", "StoreHomeDir", 
"StoreFile", "StoreFileSize", });
           internal_static_hbase_pb_BulkLoadDescriptor_descriptor =
             getDescriptor().getMessageTypes().get(6);
           internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto 
b/hbase-protocol/src/main/protobuf/WAL.proto
index cb9bd8f..aeb8a16 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -132,6 +132,7 @@ message StoreDescriptor {
   required bytes family_name = 1;
   required string store_home_dir = 2; //relative to region dir
   repeated string store_file = 3; // relative to store dir
+  optional uint64 store_file_size = 4; // size of store file
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 16cfe6a..204c729 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5408,6 +5408,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       BulkLoadListener bulkLoadListener) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], 
List<Path>>(Bytes.BYTES_COMPARATOR);
+    Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
     Preconditions.checkNotNull(familyPaths);
     // we need writeLock for multi-family bulk load
     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@@ -5490,6 +5491,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           }
           Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
 
+          // Note the size of the store file
+          try {
+            FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
+            storeFilesSizes.put(commitedStoreFile.getName(), 
fs.getFileStatus(commitedStoreFile)
+                .getLen());
+          } catch (IOException e) {
+            LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
+            storeFilesSizes.put(commitedStoreFile.getName(), 0L);
+          }
+
           if(storeFiles.containsKey(familyName)) {
             storeFiles.get(familyName).add(commitedStoreFile);
           } else {
@@ -5524,9 +5535,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       if (wal != null && !storeFiles.isEmpty()) {
         // write a bulk load event when not all hfiles are loaded
         try {
-          WALProtos.BulkLoadDescriptor loadDescriptor = 
ProtobufUtil.toBulkLoadDescriptor(
-              this.getRegionInfo().getTable(),
-              ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), 
storeFiles, seqId);
+          WALProtos.BulkLoadDescriptor loadDescriptor =
+              
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
+                
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles,
+                storeFilesSizes, seqId);
           WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, 
getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2ce2137..dc6cf36 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -711,6 +711,7 @@ public class ReplicationSource extends Thread
             currentNbOperations += countDistinctRowKeys(edit);
             entries.add(entry);
             currentSize += entry.getEdit().heapSize();
+            currentSize += calculateTotalSizeOfStoreFiles(edit);
           } else {
             metrics.incrLogEditsFiltered();
           }
@@ -737,6 +738,35 @@ public class ReplicationSource extends Thread
       return seenEntries == 0 && processEndOfFile();
     }
 
+    /**
+     * Calculate the total size of all the store files
+     * @param edit edit to count row keys from
+     * @return the total size of the store files
+     */
+    private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+      List<Cell> cells = edit.getCells();
+      int totalStoreFilesSize = 0;
+
+      int totalCells = edit.size();
+      for (int i = 0; i < totalCells; i++) {
+        if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+          try {
+            BulkLoadDescriptor bld = 
WALEdit.getBulkLoadDescriptor(cells.get(i));
+            List<StoreDescriptor> stores = bld.getStoresList();
+            int totalStores = stores.size();
+            for (int j = 0; j < totalStores; j++) {
+              totalStoreFilesSize += stores.get(j).getStoreFileSize();
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to deserialize bulk load entry from wal edit. "
+                + "Size of HFiles part of cell will not be considered in 
replication "
+                + "request size calculation.", e);
+          }
+        }
+      }
+      return totalStoreFilesSize;
+    }
+
     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
       String peerId = peerClusterZnode;
       if (peerId.contains("-")) {
@@ -745,12 +775,14 @@ public class ReplicationSource extends Thread
         peerId = peerClusterZnode.split("-")[0];
       }
       List<Cell> cells = edit.getCells();
-      for (int i = 0; i < cells.size(); i++) {
+      int totalCells = cells.size();
+      for (int i = 0; i < totalCells; i++) {
         Cell cell = cells.get(i);
         if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
           List<StoreDescriptor> stores = bld.getStoresList();
-          for (int j = 0; j < stores.size(); j++) {
+          int totalStores = stores.size();
+          for (int j = 0; j < totalStores; j++) {
             List<String> storeFileList = stores.get(j).getStoreFileList();
             manager.cleanUpHFileRefs(peerId, storeFileList);
             metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
@@ -933,18 +965,20 @@ public class ReplicationSource extends Thread
       int totalHFileEntries = 0;
       Cell lastCell = cells.get(0);
       
-      for (int i = 0; i < edit.size(); i++) {
+      int totalCells = edit.size();
+      for (int i = 0; i < totalCells; i++) {
         // Count HFiles to be replicated
         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
           try {
             BulkLoadDescriptor bld = 
WALEdit.getBulkLoadDescriptor(cells.get(i));
             List<StoreDescriptor> stores = bld.getStoresList();
-            for (int j = 0; j < stores.size(); j++) {
+            int totalStores = stores.size();
+            for (int j = 0; j < totalStores; j++) {
               totalHFileEntries += stores.get(j).getStoreFileList().size();
             }
           } catch (IOException e) {
             LOG.error("Failed to deserialize bulk load entry from wal edit. "
-                + "This its hfiles count will not be added into metric.");
+                + "Then its hfiles count will not be added into metric.");
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index e148344..79c5bec 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -286,6 +286,7 @@ public class TestReplicationSink {
     }
     List<Integer> numberList = new ArrayList<>(numbers);
     Collections.sort(numberList);
+    Map<String, Long> storeFilesSize = new HashMap<String, Long>(1);
 
     // 2. Create 25 hfiles
     Configuration conf = TEST_UTIL.getConfiguration();
@@ -296,6 +297,7 @@ public class TestReplicationSink {
       HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
         Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), 
numRows);
       p.add(hfilePath);
+      storeFilesSize.put(hfilePath.getName(), 
fs.getFileStatus(hfilePath).getLen());
     }
 
     // 3. Create a BulkLoadDescriptor and a WALEdit
@@ -309,7 +311,7 @@ public class TestReplicationSink {
       HRegionInfo regionInfo = 
l.getAllRegionLocations().get(0).getRegionInfo();
       loadDescriptor =
           ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
-            ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 
1);
+            ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 
storeFilesSize, 1);
       edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
     }
     List<WALEntry> entries = new ArrayList<WALEntry>(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0964884b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 6b945d2..0bd2208 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -464,17 +465,33 @@ public class TestReplicationSourceManager {
   private WALEdit getBulkLoadWALEdit() {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    Map<String, Long> storeFilesSize = new HashMap<>(1);
     List<Path> p = new ArrayList<>(1);
-    p.add(new Path(Bytes.toString(f1)));
+    Path hfilePath1 = new Path(Bytes.toString(f1));
+    p.add(hfilePath1);
+    try {
+      storeFilesSize.put(hfilePath1.getName(), 
fs.getFileStatus(hfilePath1).getLen());
+    } catch (IOException e) {
+      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
+      storeFilesSize.put(hfilePath1.getName(), 0L);
+    }
     storeFiles.put(f1, p);
 
     p = new ArrayList<>(1);
-    p.add(new Path(Bytes.toString(f2)));
+    Path hfilePath2 = new Path(Bytes.toString(f2));
+    p.add(hfilePath2);
+    try {
+      storeFilesSize.put(hfilePath2.getName(), 
fs.getFileStatus(hfilePath2).getLen());
+    } catch (IOException e) {
+      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
+      storeFilesSize.put(hfilePath2.getName(), 0L);
+    }
     storeFiles.put(f2, p);
 
     // 2. Create bulk load descriptor
-    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
-      ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
+    BulkLoadDescriptor desc =
+        ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
+          ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 
storeFilesSize, 1);
 
     // 3. create bulk load wal edit event
     WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);

Reply via email to