Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 fea8fd6ab -> a548d9426


http://git-wip-us.apache.org/repos/asf/hbase/blob/a548d942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
new file mode 100644
index 0000000..6ec4cd4
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestCompactor {
+
+  public static StoreFile createDummyStoreFile(long maxSequenceId) throws 
Exception {
+    // "Files" are totally unused, it's Scanner class below that gives 
compactor fake KVs.
+    // But compaction depends on everything under the sun, so stub everything 
with dummies.
+    StoreFile sf = mock(StoreFile.class);
+    StoreFile.Reader r = mock(StoreFile.Reader.class);
+    when(r.length()).thenReturn(1L);
+    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
+    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong()))
+        .thenReturn(mock(StoreFileScanner.class));
+    when(sf.getReader()).thenReturn(r);
+    when(sf.createReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
+    when(sf.cloneForReader()).thenReturn(sf);
+    when(sf.getMaxSequenceId()).thenReturn(maxSequenceId);
+    return sf;
+  }
+
+  public static CompactionRequest createDummyRequest() throws Exception {
+    return new CompactionRequest(Arrays.asList(createDummyStoreFile(1L)));
+  }
+
+  // StoreFile.Writer has private ctor and is unwieldy, so this has to be 
convoluted.
+  public static class StoreFileWritersCapture
+      implements Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory 
{
+    public static class Writer {
+      public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+      public TreeMap<byte[], byte[]> data = new TreeMap<byte[], 
byte[]>(Bytes.BYTES_COMPARATOR);
+      public boolean hasMetadata;
+    }
+
+    private List<Writer> writers = new ArrayList<Writer>();
+
+    @Override
+    public StoreFile.Writer createWriter() throws IOException {
+      final Writer realWriter = new Writer();
+      writers.add(realWriter);
+      StoreFile.Writer writer = mock(StoreFile.Writer.class);
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          return realWriter.kvs.add((KeyValue) invocation.getArguments()[0]);
+        }
+      }).when(writer).append(any(KeyValue.class));
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          return realWriter.data.put((byte[]) args[0], (byte[]) args[1]);
+        }
+      }).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
+      doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          realWriter.hasMetadata = true;
+          return null;
+        }
+      }).when(writer).appendMetadata(any(long.class), any(boolean.class));
+      doAnswer(new Answer<Path>() {
+        @Override
+        public Path answer(InvocationOnMock invocation) throws Throwable {
+          return new Path("foo");
+        }
+      }).when(writer).getPath();
+      return writer;
+    }
+
+    @Override
+    public StoreFile.Writer answer(InvocationOnMock invocation) throws 
Throwable {
+      return createWriter();
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean 
requireMetadata) {
+      if (allFiles) {
+        assertEquals(kvss.length, writers.size());
+      }
+      int skippedWriters = 0;
+      for (int i = 0; i < kvss.length; ++i) {
+        KeyValue[] kvs = kvss[i];
+        if (kvs != null) {
+          Writer w = writers.get(i - skippedWriters);
+          if (requireMetadata) {
+            assertNotNull(w.data.get(STRIPE_START_KEY));
+            assertNotNull(w.data.get(STRIPE_END_KEY));
+          } else {
+            assertNull(w.data.get(STRIPE_START_KEY));
+            assertNull(w.data.get(STRIPE_END_KEY));
+          }
+          assertEquals(kvs.length, w.kvs.size());
+          for (int j = 0; j < kvs.length; ++j) {
+            assertEquals(kvs[j], w.kvs.get(j));
+          }
+        } else {
+          assertFalse(allFiles);
+          ++skippedWriters;
+        }
+      }
+    }
+
+    public void verifyBoundaries(byte[][] boundaries) {
+      assertEquals(boundaries.length - 1, writers.size());
+      for (int i = 0; i < writers.size(); ++i) {
+        assertArrayEquals("i = " + i, boundaries[i], 
writers.get(i).data.get(STRIPE_START_KEY));
+        assertArrayEquals("i = " + i, boundaries[i + 1], 
writers.get(i).data.get(STRIPE_END_KEY));
+      }
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, List<Long> 
boundaries) {
+      if (allFiles) {
+        assertEquals(kvss.length, writers.size());
+      }
+      int skippedWriters = 0;
+      for (int i = 0; i < kvss.length; ++i) {
+        KeyValue[] kvs = kvss[i];
+        if (kvs != null) {
+          Writer w = writers.get(i - skippedWriters);
+          assertEquals(kvs.length, w.kvs.size());
+          for (int j = 0; j < kvs.length; ++j) {
+            assertTrue(kvs[j].getTimestamp() >= boundaries.get(i));
+            assertTrue(kvs[j].getTimestamp() < boundaries.get(i + 1));
+            assertEquals(kvs[j], w.kvs.get(j));
+          }
+        } else {
+          assertFalse(allFiles);
+          ++skippedWriters;
+        }
+      }
+    }
+
+    public List<Writer> getWriters() {
+      return writers;
+    }
+  }
+
+  public static class Scanner implements InternalScanner {
+    private final ArrayList<KeyValue> kvs;
+
+    public Scanner(KeyValue... kvs) {
+      this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+      if (kvs.isEmpty()) return false;
+      results.add(kvs.remove(0));
+      return !kvs.isEmpty();
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+      return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a548d942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
new file mode 100644
index 0000000..0c3c8b6
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
+import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestDateTieredCompactor {
+
+  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
+
+  private static final TableName TABLE_NAME = 
TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
+
+  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 
100L);
+
+  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 
200L);
+
+  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 
300L);
+
+  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 
400L);
+
+  @Parameters(name = "{index}: usePrivateReaders={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { true }, new Object[] { false });
+  }
+
+  @Parameter
+  public boolean usePrivateReaders;
+
+  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
+      final KeyValue[] input, List<StoreFile> storefiles) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
+    final Scanner scanner = new Scanner(input);
+    // Create store mock that is satisfactory for compactor.
+    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
+    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new 
KVComparator());
+    final Store store = mock(Store.class);
+    when(store.getStorefiles()).thenReturn(storefiles);
+    when(store.getFamily()).thenReturn(col);
+    when(store.getScanInfo()).thenReturn(si);
+    when(store.areWritesEnabled()).thenReturn(true);
+    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
+    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
+    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), 
anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.getComparator()).thenReturn(new KVComparator());
+    long maxSequenceId = StoreFile.getMaxSequenceIdInList(storefiles);
+    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
+
+    return new DateTieredCompactor(conf, store) {
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          long smallestReadPoint, long earliestPutTs, byte[] 
dropDeletesFromRow,
+          byte[] dropDeletesToRow) throws IOException {
+        return scanner;
+      }
+
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          ScanType scanType, long smallestReadPoint, long earliestPutTs) 
throws IOException {
+        return scanner;
+      }
+    };
+  }
+
+  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] 
output,
+      boolean allFiles) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StoreFile sf1 = createDummyStoreFile(1L);
+    StoreFile sf2 = createDummyStoreFile(2L);
+    DateTieredCompactor dtc = createCompactor(writers, input, 
Arrays.asList(sf1, sf2));
+    List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)),
+      boundaries.subList(0, boundaries.size() - 1), 
NoLimitThroughputController.INSTANCE, null);
+    writers.verifyKvs(output, allFiles, boundaries);
+    if (allFiles) {
+      assertEquals(output.length, paths.size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T[] a(T... a) {
+    return a;
+  }
+
+  @Test
+  public void test() throws Exception {
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 
500L),
+      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, 
Long.MAX_VALUE),
+      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 
Long.MAX_VALUE),
+      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
+  }
+
+  @Test
+  public void testEmptyOutputFile() throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    CompactionRequest request = createDummyRequest();
+    DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
+      new ArrayList<StoreFile>(request.getFiles()));
+    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, 
Long.MAX_VALUE),
+      NoLimitThroughputController.INSTANCE, null);
+    assertEquals(1, paths.size());
+    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
+    assertEquals(1, dummyWriters.size());
+    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
+    assertTrue(dummyWriter.kvs.isEmpty());
+    assertTrue(dummyWriter.hasMetadata);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a548d942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 889b91a..4d47840 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -43,6 +43,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -64,8 +68,8 @@ import 
org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
-import 
org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
 import 
org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -75,11 +79,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.ArgumentMatcher;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
+@RunWith(Parameterized.class)
 @Category(SmallTests.class)
 public class TestStripeCompactionPolicy {
   private static final byte[] KEY_A = Bytes.toBytes("aaa");
@@ -99,6 +105,13 @@ public class TestStripeCompactionPolicy {
   private final static int defaultInitialCount = 1;
   private static long defaultTtl = 1000 * 1000;
 
+  @Parameters(name = "{index}: usePrivateReaders={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { true }, new Object[] { false });
+  }
+
+  @Parameter
+  public boolean usePrivateReaders;
   @Test
   public void testNoStripesFromFlush() throws Exception {
     Configuration conf = HBaseConfiguration.create();
@@ -388,6 +401,7 @@ public class TestStripeCompactionPolicy {
     }
   }
 
+  @SuppressWarnings("unchecked")
   private static StripeCompactionPolicy.StripeInformationProvider 
createStripesWithFiles(
       List<StoreFile>... stripeFiles) throws Exception {
     return createStripesWithFiles(createBoundaries(stripeFiles.length),
@@ -564,9 +578,10 @@ public class TestStripeCompactionPolicy {
   protected void verifyFlush(StripeCompactionPolicy policy, 
StripeInformationProvider si,
       KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws 
IOException {
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, 
input.length);
+    StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(new 
KVComparator(), si,
+      input.length);
     StripeMultiFileWriter mw = req.createWriter();
-    mw.init(null, writers, new KeyValue.KVComparator());
+    mw.init(null, writers);
     for (KeyValue kv : input) {
       mw.append(kv);
     }
@@ -728,6 +743,7 @@ public class TestStripeCompactionPolicy {
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
+    when(sf.cloneForReader()).thenReturn(sf);
     return sf;
   }
 
@@ -740,7 +756,7 @@ public class TestStripeCompactionPolicy {
     
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
   }
 
-  private static StripeCompactor createCompactor() throws Exception {
+  private StripeCompactor createCompactor() throws Exception {
     HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
     Store store = mock(Store.class);
@@ -753,6 +769,7 @@ public class TestStripeCompactionPolicy {
         anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
     final Scanner scanner = new Scanner();
     return new StripeCompactor(conf, store) {
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a548d942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
new file mode 100644
index 0000000..cee118f
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
+import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestStripeCompactor {
+  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
+  private static final TableName TABLE_NAME = 
TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
+
+  private static final byte[] KEY_B = Bytes.toBytes("bbb");
+  private static final byte[] KEY_C = Bytes.toBytes("ccc");
+  private static final byte[] KEY_D = Bytes.toBytes("ddd");
+
+  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
+  private static final KeyValue KV_B = kvAfter(KEY_B);
+  private static final KeyValue KV_C = kvAfter(KEY_C);
+  private static final KeyValue KV_D = kvAfter(KEY_D);
+
+  @Parameters(name = "{index}: usePrivateReaders={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { true }, new Object[] { false });
+  }
+
+  @Parameter
+  public boolean usePrivateReaders;
+
+  private static KeyValue kvAfter(byte[] key) {
+    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T[] a(T... a) {
+    return a;
+  }
+
+  private static KeyValue[] e() {
+    return TestStripeCompactor.<KeyValue> a();
+  }
+
+  @Test
+  public void testBoundaryCompactions() throws Exception {
+    // General verification
+    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
+      a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, 
KV_C), a(KV_D)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), 
a(KV_C)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] 
{ a(KV_B, KV_C) });
+  }
+
+  @Test
+  public void testBoundaryCompactionEmptyFiles() throws Exception {
+    // No empty file if there're already files.
+    verifyBoundaryCompaction(a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), 
a(a(KV_B), null, null),
+      null, null, false);
+    verifyBoundaryCompaction(a(KV_A, KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D),
+      a(a(KV_A), null, a(KV_C)), null, null, false);
+    // But should be created if there are no file.
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, 
null, e()), null,
+      null, false);
+    // In major range if there's major range.
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, 
e(), null), KEY_B,
+      KEY_C, false);
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), 
e(), null), OPEN_KEY,
+      KEY_C, false);
+    // Major range should have files regardless of KVs.
+    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
+      a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
+    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
+      a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
+
+  }
+
+  private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, 
KeyValue[][] output)
+      throws Exception {
+    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
+  }
+
+  private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, 
KeyValue[][] output,
+      byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths = sc.compact(createDummyRequest(), 
Arrays.asList(boundaries), majorFrom,
+      majorTo, NoLimitThroughputController.INSTANCE, null);
+    writers.verifyKvs(output, allFiles, true);
+    if (allFiles) {
+      assertEquals(output.length, paths.size());
+      writers.verifyBoundaries(boundaries);
+    }
+  }
+
+  @Test
+  public void testSizeCompactions() throws Exception {
+    // General verification with different sizes.
+    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
+      a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), 
a(KV_C)));
+    // Verify row boundaries are preserved.
+    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
+      a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
+    // Too much data, count limits the number of files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B, KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, 
KEY_D,
+      new KeyValue[][] { a(KV_A, KV_B, KV_C) });
+    // Too little data/large count, no extra files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, 
OPEN_KEY, OPEN_KEY,
+      a(a(KV_A, KV_B), a(KV_C, KV_D)));
+  }
+
+  private void verifySizeCompaction(KeyValue[] input, int targetCount, long 
targetSize, byte[] left,
+      byte[] right, KeyValue[][] output) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths = sc.compact(createDummyRequest(), targetCount, 
targetSize, left, right, null,
+      null, NoLimitThroughputController.INSTANCE, null);
+    assertEquals(output.length, paths.size());
+    writers.verifyKvs(output, true, true);
+    List<byte[]> boundaries = new ArrayList<byte[]>();
+    boundaries.add(left);
+    for (int i = 1; i < output.length; ++i) {
+      boundaries.add(CellUtil.cloneRow(output[i][0]));
+    }
+    boundaries.add(right);
+    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
+  }
+
+  private StripeCompactor createCompactor(StoreFileWritersCapture writers, 
KeyValue[] input)
+      throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
+    final Scanner scanner = new Scanner(input);
+
+    // Create store mock that is satisfactory for compactor.
+    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
+    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new 
KVComparator());
+    Store store = mock(Store.class);
+    when(store.getFamily()).thenReturn(col);
+    when(store.getScanInfo()).thenReturn(si);
+    when(store.areWritesEnabled()).thenReturn(true);
+    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
+    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
+    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), 
anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.getComparator()).thenReturn(new KVComparator());
+
+    return new StripeCompactor(conf, store) {
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          long smallestReadPoint, long earliestPutTs, byte[] 
dropDeletesFromRow,
+          byte[] dropDeletesToRow) throws IOException {
+        return scanner;
+      }
+
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          ScanType scanType, long smallestReadPoint, long earliestPutTs) 
throws IOException {
+        return scanner;
+      }
+    };
+  }
+}

Reply via email to