HBASE-17872 The MSLABImpl generates the invaild cells when unsafe is not availble
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df96d328 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df96d328 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df96d328 Branch: refs/heads/HBASE-16961 Commit: df96d328fb9fa11f04f84607e9a23f254f513202 Parents: 59e8b8e Author: CHIA-PING TSAI <chia7...@gmail.com> Authored: Sat Apr 8 17:37:37 2017 +0800 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Sun Apr 9 23:28:34 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/util/ByteBufferUtils.java | 30 ++-- .../hadoop/hbase/util/TestByteBufferUtils.java | 165 ++++++++++++++++++- .../hbase/util/TestFromClientSide3WoUnsafe.java | 43 +++++ 3 files changed, 213 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index ff4c843..34a4e02 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -43,15 +43,14 @@ import sun.nio.ch.DirectBuffer; @SuppressWarnings("restriction") @InterfaceAudience.Public public final class ByteBufferUtils { - // "Compressed integer" serialization helper constants. public final static int VALUE_MASK = 0x7f; public final static int NEXT_BIT_SHIFT = 7; public final static int NEXT_BIT_MASK = 1 << 7; @VisibleForTesting - static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable(); + final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable(); @VisibleForTesting - static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); + final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); private ByteBufferUtils() { } @@ -404,12 +403,11 @@ public final class ByteBufferUtils { } else if (UNSAFE_AVAIL) { UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); } else { - int outOldPos = out.position(); - out.position(destinationOffset); + ByteBuffer outDup = out.duplicate(); + outDup.position(destinationOffset); ByteBuffer inDup = in.duplicate(); inDup.position(sourceOffset).limit(sourceOffset + length); - out.put(inDup); - out.position(outOldPos); + outDup.put(inDup); } return destinationOffset + length; } @@ -990,7 +988,7 @@ public final class ByteBufferUtils { /** * Copies bytes from given array's offset to length part into the given buffer. Puts the bytes - * to buffer's given position. + * to buffer's given position. This doesn't affact the position of buffer. * @param out * @param in * @param inOffset @@ -1003,16 +1001,15 @@ public final class ByteBufferUtils { } else if (UNSAFE_AVAIL) { UnsafeAccess.copy(in, inOffset, out, outOffset, length); } else { - int oldPos = out.position(); - out.position(outOffset); - out.put(in, inOffset, length); - out.position(oldPos); + ByteBuffer outDup = out.duplicate(); + outDup.position(outOffset); + outDup.put(in, inOffset, length); } } /** * Copies specified number of bytes from given offset of 'in' ByteBuffer to - * the array. + * the array. This doesn't affact the position of buffer. * @param out * @param in * @param sourceOffset @@ -1026,10 +1023,9 @@ public final class ByteBufferUtils { } else if (UNSAFE_AVAIL) { UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); } else { - int oldPos = in.position(); - in.position(sourceOffset); - in.get(out, destinationOffset, length); - in.position(oldPos); + ByteBuffer inDup = in.duplicate(); + inDup.position(sourceOffset); + inDup.get(out, destinationOffset, length); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java index 053fb24..ee03c7b 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java @@ -27,14 +27,22 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MiscTests; @@ -50,13 +58,13 @@ import org.junit.runners.Parameterized; @Category({MiscTests.class, SmallTests.class}) @RunWith(Parameterized.class) public class TestByteBufferUtils { - + private static final String UNSAFE_AVAIL_NAME = "UNSAFE_AVAIL"; + private static final String UNSAFE_UNALIGNED_NAME = "UNSAFE_UNALIGNED"; private byte[] array; @AfterClass public static void afterClass() throws Exception { - ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable(); - ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); + detectAvailabilityOfUnsafe(); } @Parameterized.Parameters @@ -69,15 +77,50 @@ public class TestByteBufferUtils { return paramList; } - public TestByteBufferUtils(boolean useUnsafeIfPossible) { + private static void setUnsafe(String fieldName, boolean value) throws Exception { + Field field = ByteBufferUtils.class.getDeclaredField(fieldName); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + int oldModifiers = field.getModifiers(); + modifiersField.setInt(field, oldModifiers & ~Modifier.FINAL); + try { + field.set(null, value); + } finally { + modifiersField.setInt(field, oldModifiers); + } + } + + static void disableUnsafe() throws Exception { + if (ByteBufferUtils.UNSAFE_AVAIL) { + setUnsafe(UNSAFE_AVAIL_NAME, false); + } + if (ByteBufferUtils.UNSAFE_UNALIGNED) { + setUnsafe(UNSAFE_UNALIGNED_NAME, false); + } + assertFalse(ByteBufferUtils.UNSAFE_AVAIL); + assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED); + } + + static void detectAvailabilityOfUnsafe() throws Exception { + if (ByteBufferUtils.UNSAFE_AVAIL != UnsafeAvailChecker.isAvailable()) { + setUnsafe(UNSAFE_AVAIL_NAME, UnsafeAvailChecker.isAvailable()); + } + if (ByteBufferUtils.UNSAFE_UNALIGNED != UnsafeAvailChecker.unaligned()) { + setUnsafe(UNSAFE_UNALIGNED_NAME, UnsafeAvailChecker.unaligned()); + } + assertEquals(ByteBufferUtils.UNSAFE_AVAIL, UnsafeAvailChecker.isAvailable()); + assertEquals(ByteBufferUtils.UNSAFE_UNALIGNED, UnsafeAvailChecker.unaligned()); + } + + public TestByteBufferUtils(boolean useUnsafeIfPossible) throws Exception { if (useUnsafeIfPossible) { - ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable(); - ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); + detectAvailabilityOfUnsafe(); } else { - ByteBufferUtils.UNSAFE_AVAIL = false; - ByteBufferUtils.UNSAFE_UNALIGNED = false; + disableUnsafe(); } } + /** * Create an array with sample data. */ @@ -388,6 +431,111 @@ public class TestByteBufferUtils { assertEquals(i, buffer.getInt()); } + private void testCopyFromSrcToDestWithThreads(Object input, Object output, + List<Integer> lengthes, List<Integer> offsets) throws InterruptedException { + assertTrue((input instanceof ByteBuffer) || (input instanceof byte[])); + assertTrue((output instanceof ByteBuffer) || (output instanceof byte[])); + assertEquals(lengthes.size(), offsets.size()); + + final int threads = lengthes.size(); + CountDownLatch latch = new CountDownLatch(1); + List<Runnable> exes = new ArrayList<>(threads); + int oldInputPos = (input instanceof ByteBuffer) ? ((ByteBuffer) input).position() : 0; + int oldOutputPos = (output instanceof ByteBuffer) ? ((ByteBuffer) output).position() : 0; + for (int i = 0; i != threads; ++i) { + int offset = offsets.get(i); + int length = lengthes.get(i); + exes.add(() -> { + try { + latch.await(); + if (input instanceof ByteBuffer && output instanceof byte[]) { + ByteBufferUtils.copyFromBufferToArray((byte[]) output, + (ByteBuffer) input, offset, offset, length); + } + if (input instanceof byte[] && output instanceof ByteBuffer) { + ByteBufferUtils.copyFromArrayToBuffer((ByteBuffer) output, + offset, (byte[]) input, offset, length); + } + if (input instanceof ByteBuffer && output instanceof ByteBuffer) { + ByteBufferUtils.copyFromBufferToBuffer((ByteBuffer) input, + (ByteBuffer) output, offset, offset, length); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + } + ExecutorService service = Executors.newFixedThreadPool(threads); + exes.forEach(service::execute); + latch.countDown(); + service.shutdown(); + assertTrue(service.awaitTermination(5, TimeUnit.SECONDS)); + if (input instanceof ByteBuffer) { + assertEquals(oldInputPos, ((ByteBuffer) input).position()); + } + if (output instanceof ByteBuffer) { + assertEquals(oldOutputPos, ((ByteBuffer) output).position()); + } + String inputString = (input instanceof ByteBuffer) ? + Bytes.toString(Bytes.toBytes((ByteBuffer) input)) : Bytes.toString((byte[]) input); + String outputString = (output instanceof ByteBuffer) ? + Bytes.toString(Bytes.toBytes((ByteBuffer) output)) : Bytes.toString((byte[]) output); + assertEquals(inputString, outputString); + } + + @Test + public void testCopyFromSrcToDestWithThreads() throws InterruptedException { + List<byte[]> words = Arrays.asList( + Bytes.toBytes("with"), + Bytes.toBytes("great"), + Bytes.toBytes("power"), + Bytes.toBytes("comes"), + Bytes.toBytes("great"), + Bytes.toBytes("responsibility") + ); + List<Integer> lengthes = words.stream().map(v -> v.length).collect(Collectors.toList()); + List<Integer> offsets = new ArrayList<>(words.size()); + for (int i = 0; i != words.size(); ++i) { + offsets.add(words.subList(0, i).stream().mapToInt(v -> v.length).sum()); + } + + int totalSize = words.stream().mapToInt(v -> v.length).sum(); + byte[] fullContent = new byte[totalSize]; + int offset = 0; + for (byte[] w : words) { + offset = Bytes.putBytes(fullContent, offset, w, 0, w.length); + } + + // test copyFromBufferToArray + for (ByteBuffer input : Arrays.asList( + ByteBuffer.allocateDirect(totalSize), + ByteBuffer.allocate(totalSize))) { + words.forEach(input::put); + byte[] output = new byte[totalSize]; + testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets); + } + + // test copyFromArrayToBuffer + for (ByteBuffer output : Arrays.asList( + ByteBuffer.allocateDirect(totalSize), + ByteBuffer.allocate(totalSize))) { + byte[] input = fullContent; + testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets); + } + + // test copyFromBufferToBuffer + for (ByteBuffer input : Arrays.asList( + ByteBuffer.allocateDirect(totalSize), + ByteBuffer.allocate(totalSize))) { + words.forEach(input::put); + for (ByteBuffer output : Arrays.asList( + ByteBuffer.allocateDirect(totalSize), + ByteBuffer.allocate(totalSize))) { + testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets); + } + } + } + @Test public void testCopyFromBufferToArray() { ByteBuffer buffer = ByteBuffer.allocate(15); @@ -492,4 +640,5 @@ public class TestByteBufferUtils { bb[i] = b; } } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java new file mode 100644 index 0000000..c04e76b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java @@ -0,0 +1,43 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.client.TestFromClientSide3; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({LargeTests.class, ClientTests.class}) +public class TestFromClientSide3WoUnsafe extends TestFromClientSide3 { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestByteBufferUtils.disableUnsafe(); + TestFromClientSide3.setUpBeforeClass(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestFromClientSide3.tearDownAfterClass(); + TestByteBufferUtils.detectAvailabilityOfUnsafe(); + } +}