Repository: hbase Updated Branches: refs/heads/master e60a7f6e7 -> 2182ca34a
HBASE-16992 The usage of mutation from CP is weird. (ChiaPing Tsai) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2182ca34 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2182ca34 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2182ca34 Branch: refs/heads/master Commit: 2182ca34ac935edd126e7ed17eb3237772d9d18b Parents: e60a7f6 Author: anoopsamjohn <anoopsamj...@gmail.com> Authored: Mon Nov 7 23:24:34 2016 +0530 Committer: anoopsamjohn <anoopsamj...@gmail.com> Committed: Mon Nov 7 23:24:34 2016 +0530 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 9 +-- .../MiniBatchOperationInProgress.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 72 ++++++++++++++++++++ 3 files changed, 76 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7a9d4e2..3423473 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3202,10 +3202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Acquire row locks. If not, the whole batch will fail. acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - if (cpMutation.getDurability() == Durability.SKIP_WAL) { - recordMutationWithoutWal(cpFamilyMap); - } - // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later @@ -3227,6 +3223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } + // we use durability of the original mutation for the mutation passed by CP. if (tmpDur == Durability.SKIP_WAL) { recordMutationWithoutWal(m.getFamilyCellMap()); continue; @@ -3324,6 +3321,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We need to update the sequence id for following reasons. // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. // 2) If no WAL, FSWALEntry won't be used + // we use durability of the original mutation for the mutation passed by CP. boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { this.updateSequenceId(familyMaps[i].values(), @@ -7978,8 +7976,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i=0; i < listSize; i++) { Cell cell = cells.get(i); - // TODO we need include tags length also here. - mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength(); + mutationSize += KeyValueUtil.length(cell); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index cdbecac..1ab2ef5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -110,7 +110,7 @@ public class MiniBatchOperationInProgress<T> { * Add more Mutations corresponding to the Mutation at the given index to be committed atomically * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. - * + * <b>Note:</b> The durability from CP will be replaced by the durability of corresponding mutation. * @param index the index that corresponds to the original mutation index in the batch * @param newOperations the Mutations to add */ http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 3e5b127..6c63cff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; @@ -2414,6 +2415,77 @@ public class TestHRegion { } @Test + public void testDataInMemoryWithoutWAL() throws IOException { + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); + FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + + Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, + System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1); + final long originalSize = KeyValueUtil.length(originalCell); + + Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, + System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx")); + final long addSize = KeyValueUtil.length(addCell); + + LOG.info("originalSize:" + originalSize + + ", addSize:" + addSize); + // start test. We expect that the addPut's durability will be replaced + // by originalPut's durability. + + // case 1: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), + new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), + originalSize + addSize); + + // case 2: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), + new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), + originalSize + addSize); + + // case 3: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), + new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), + 0); + + // case 4: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), + new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), + 0); + } + + private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut, + final Put addPut, long delta) throws IOException { + final long initSize = region.getDataInMemoryWithoutWAL(); + // save normalCPHost and replaced by mockedCPHost + RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + Answer<Boolean> answer = new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0, + MiniBatchOperationInProgress.class); + mb.addOperationsFromCP(0, new Mutation[]{addPut}); + return false; + } + }; + when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class))) + .then(answer); + region.setCoprocessorHost(mockedCPHost); + region.put(originalPut); + region.setCoprocessorHost(normalCPHost); + final long finalSize = region.getDataInMemoryWithoutWAL(); + assertEquals("finalSize:" + finalSize + ", initSize:" + + initSize + ", delta:" + delta,finalSize, initSize + delta); + } + + @Test public void testDeleteColumns_PostInsert() throws IOException, InterruptedException { Delete delete = new Delete(row); delete.addColumns(fam1, qual1);