http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java new file mode 100644 index 0000000..7ef9b9c --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.common.collect.Lists; + +/** + * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be + * removed when old non-secure client for backward compatibility is not supported. + */ +@RunWith(Parameterized.class) +@Category({RegionServerTests.class, LargeTests.class}) +public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad { + public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) { + super(duration); + } + + private static final Log LOG = + LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + conf.setInt("hbase.rpc.timeout", 10 * 1000); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } + + public static class AtomicHFileLoader extends RepeatingTestThread { + final AtomicLong numBulkLoads = new AtomicLong(); + final AtomicLong numCompactions = new AtomicLong(); + private TableName tableName; + + public AtomicHFileLoader(TableName tableName, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.tableName = tableName; + } + + public void doAnAction() throws Exception { + long iteration = numBulkLoads.getAndIncrement(); + Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", + iteration)); + + // create HFiles for different column families + FileSystem fs = UTIL.getTestFileSystem(); + byte[] val = Bytes.toBytes(String.format("%010d", iteration)); + final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>( + NUM_CFS); + for (int i = 0; i < NUM_CFS; i++) { + Path hfile = new Path(dir, family(i)); + byte[] fam = Bytes.toBytes(family(i)); + createHFile(fs, hfile, fam, QUAL, val, 1000); + famPaths.add(new Pair<>(fam, hfile.toString())); + } + + // bulk load HFiles + final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + Table table = conn.getTable(tableName); + final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); + RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); + ClientServiceCallable<Void> callable = + new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), + rpcControllerFactory.newController()) { + @Override + protected Void rpcCall() throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + try (Table table = conn.getTable(getTableName())) { + boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, + null, bulkToken, getLocation().getRegionInfo().getStartKey()); + } + return null; + } + }; + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); + caller.callWithRetries(callable, Integer.MAX_VALUE); + + // Periodically do compaction to reduce the number of open file handles. + if (numBulkLoads.get() % 5 == 0) { + // 5 * 50 = 250 open file handles! + callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), + rpcControllerFactory.newController()) { + @Override + protected Void rpcCall() throws Exception { + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + AdminProtos.AdminService.BlockingInterface server = + conn.getAdmin(getLocation().getServerName()); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest( + getLocation().getRegionInfo().getRegionName(), true, null); + server.compactRegion(null, request); + numCompactions.incrementAndGet(); + return null; + } + }; + caller.callWithRetries(callable, Integer.MAX_VALUE); + } + } + } + + void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) + throws Exception { + setupTable(tableName, 10); + + TestContext ctx = new TestContext(UTIL.getConfiguration()); + + AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); + ctx.addThread(loader); + + List<AtomicScanReader> scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Loaders:"); + LOG.info(" loaded " + loader.numBulkLoads.get()); + LOG.info(" compations " + loader.numCompactions.get()); + + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java new file mode 100644 index 0000000..9bff701 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java @@ -0,0 +1,480 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestServerCustomProtocol { + private static final Log LOG = LogFactory.getLog(TestServerCustomProtocol.class); + static final String WHOAREYOU = "Who are you?"; + static final String NOBODY = "nobody"; + static final String HELLO = "Hello, "; + + /* Test protocol implementation */ + public static class PingHandler extends PingProtos.PingService + implements Coprocessor, CoprocessorService { + private int counter = 0; + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) return; + throw new CoprocessorException("Must be loaded on a table region!"); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // Nothing to do. + } + + @Override + public void ping(RpcController controller, PingRequest request, + RpcCallback<PingResponse> done) { + this.counter++; + done.run(PingResponse.newBuilder().setPong("pong").build()); + } + + @Override + public void count(RpcController controller, CountRequest request, + RpcCallback<CountResponse> done) { + done.run(CountResponse.newBuilder().setCount(this.counter).build()); + } + + @Override + public void increment(RpcController controller, + IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) { + this.counter += request.getDiff(); + done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build()); + } + + @Override + public void hello(RpcController controller, HelloRequest request, + RpcCallback<HelloResponse> done) { + if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build()); + else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build()); + else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build()); + } + + @Override + public void noop(RpcController controller, NoopRequest request, + RpcCallback<NoopResponse> done) { + done.run(NoopResponse.newBuilder().build()); + } + + @Override + public Service getService() { + return this; + } + } + + private static final TableName TEST_TABLE = TableName.valueOf("test"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); + + private static final byte[] ROW_A = Bytes.toBytes("aaa"); + private static final byte[] ROW_B = Bytes.toBytes("bbb"); + private static final byte[] ROW_C = Bytes.toBytes("ccc"); + + private static final byte[] ROW_AB = Bytes.toBytes("abb"); + private static final byte[] ROW_BC = Bytes.toBytes("bcc"); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + PingHandler.class.getName()); + util.startMiniCluster(); + } + + @Before + public void before() throws Exception { + final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C }; + Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS); + + Put puta = new Put( ROW_A ); + puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); + table.put(puta); + + Put putb = new Put( ROW_B ); + putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); + table.put(putb); + + Put putc = new Put( ROW_C ); + putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); + table.put(putc); + } + + @After + public void after() throws Exception { + util.deleteTable(TEST_TABLE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testSingleProxy() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + Map<byte [], String> results = ping(table, null, null); + // There are three regions so should get back three results. + assertEquals(3, results.size()); + for (Map.Entry<byte [], String> e: results.entrySet()) { + assertEquals("Invalid custom protocol response", "pong", e.getValue()); + } + hello(table, "George", HELLO + "George"); + LOG.info("Did george"); + hello(table, null, "Who are you?"); + LOG.info("Who are you"); + hello(table, NOBODY, null); + LOG.info(NOBODY); + Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class, + null, null, + new Batch.Call<PingProtos.PingService, Integer>() { + @Override + public Integer call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse>(); + instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback); + return rpcCallback.get().getCount(); + } + }); + int count = -1; + for (Map.Entry<byte [], Integer> e: intResults.entrySet()) { + assertTrue(e.getValue() > 0); + count = e.getValue(); + } + final int diff = 5; + intResults = table.coprocessorService(PingProtos.PingService.class, + null, null, + new Batch.Call<PingProtos.PingService, Integer>() { + @Override + public Integer call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse>(); + instance.increment(null, + PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(), + rpcCallback); + return rpcCallback.get().getCount(); + } + }); + // There are three regions so should get back three results. + assertEquals(3, results.size()); + for (Map.Entry<byte [], Integer> e: intResults.entrySet()) { + assertEquals(e.getValue().intValue(), count + diff); + } + table.close(); + } + + private Map<byte [], String> hello(final Table table, final String send, final String response) + throws ServiceException, Throwable { + Map<byte [], String> results = hello(table, send); + for (Map.Entry<byte [], String> e: results.entrySet()) { + assertEquals("Invalid custom protocol response", response, e.getValue()); + } + return results; + } + + private Map<byte [], String> hello(final Table table, final String send) + throws ServiceException, Throwable { + return hello(table, send, null, null); + } + + private Map<byte [], String> hello(final Table table, final String send, final byte [] start, + final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(PingProtos.PingService.class, + start, end, + new Batch.Call<PingProtos.PingService, String>() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse>(); + PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); + if (send != null) builder.setName(send); + instance.hello(null, builder.build(), rpcCallback); + PingProtos.HelloResponse r = rpcCallback.get(); + return r != null && r.hasResponse()? r.getResponse(): null; + } + }); + } + + private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start, + final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(PingProtos.PingService.class, + start, end, + new Batch.Call<PingProtos.PingService, String>() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse>(); + PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); + // Call ping on same instance. Use result calling hello on same instance. + builder.setName(doPing(instance)); + instance.hello(null, builder.build(), rpcCallback); + PingProtos.HelloResponse r = rpcCallback.get(); + return r != null && r.hasResponse()? r.getResponse(): null; + } + }); + } + + private Map<byte [], String> noop(final Table table, final byte [] start, + final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(PingProtos.PingService.class, start, end, + new Batch.Call<PingProtos.PingService, String>() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse>(); + PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder(); + instance.noop(null, builder.build(), rpcCallback); + rpcCallback.get(); + // Looks like null is expected when void. That is what the test below is looking for + return null; + } + }); + } + + @Test + public void testSingleMethod() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE); + RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { + Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class, + null, ROW_A, + new Batch.Call<PingProtos.PingService, String>() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse>(); + instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); + return rpcCallback.get().getPong(); + } + }); + // Should have gotten results for 1 of the three regions only since we specified + // rows from 1 region + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_A); + + final String name = "NAME"; + results = hello(table, name, null, ROW_A); + // Should have gotten results for 1 of the three regions only since we specified + // rows from 1 region + assertEquals(1, results.size()); + verifyRegionResults(locator, results, "Hello, NAME", ROW_A); + } + } + + @Test + public void testRowRange() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE); + RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { + for (HRegionLocation e: locator.getAllRegionLocations()) { + LOG.info("Region " + e.getRegionInfo().getRegionNameAsString() + + ", servername=" + e.getServerName()); + } + // Here are what regions looked like on a run: + // + // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d. + // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e. + // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74. + + Map<byte [], String> results = ping(table, null, ROW_A); + // Should contain first region only. + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_A); + + // Test start row + empty end + results = ping(table, ROW_BC, null); + assertEquals(2, results.size()); + // should contain last 2 regions + HRegionLocation loc = locator.getRegionLocation(ROW_A, true); + assertNull("Should be missing region for row aaa (prior to start row)", + results.get(loc.getRegionInfo().getRegionName())); + verifyRegionResults(locator, results, ROW_B); + verifyRegionResults(locator, results, ROW_C); + + // test empty start + end + results = ping(table, null, ROW_BC); + // should contain the first 2 regions + assertEquals(2, results.size()); + verifyRegionResults(locator, results, ROW_A); + verifyRegionResults(locator, results, ROW_B); + loc = locator.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); + + // test explicit start + end + results = ping(table, ROW_AB, ROW_BC); + // should contain first 2 regions + assertEquals(2, results.size()); + verifyRegionResults(locator, results, ROW_A); + verifyRegionResults(locator, results, ROW_B); + loc = locator.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); + + // test single region + results = ping(table, ROW_B, ROW_BC); + // should only contain region bbb + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_B); + loc = locator.getRegionLocation(ROW_A, true); + assertNull("Should be missing region for row aaa (prior to start)", + results.get(loc.getRegionInfo().getRegionName())); + loc = locator.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); + } + } + + private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(PingProtos.PingService.class, start, end, + new Batch.Call<PingProtos.PingService, String>() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + return doPing(instance); + } + }); + } + + private static String doPing(PingProtos.PingService instance) throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse>(); + instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); + return rpcCallback.get().getPong(); + } + + @Test + public void testCompoundCall() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE); + RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { + Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C); + verifyRegionResults(locator, results, "Hello, pong", ROW_A); + verifyRegionResults(locator, results, "Hello, pong", ROW_B); + verifyRegionResults(locator, results, "Hello, pong", ROW_C); + } + } + + @Test + public void testNullCall() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE); + RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { + Map<byte[],String> results = hello(table, null, ROW_A, ROW_C); + verifyRegionResults(locator, results, "Who are you?", ROW_A); + verifyRegionResults(locator, results, "Who are you?", ROW_B); + verifyRegionResults(locator, results, "Who are you?", ROW_C); + } + } + + @Test + public void testNullReturn() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE); + RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { + Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C); + verifyRegionResults(locator, results, null, ROW_A); + verifyRegionResults(locator, results, null, ROW_B); + verifyRegionResults(locator, results, null, ROW_C); + } + } + + @Test + public void testEmptyReturnType() throws Throwable { + try (Table table = util.getConnection().getTable(TEST_TABLE)) { + Map<byte[],String> results = noop(table, ROW_A, ROW_C); + assertEquals("Should have results from three regions", 3, results.size()); + // all results should be null + for (Object v : results.values()) { + assertNull(v); + } + } + } + + private void verifyRegionResults(RegionLocator table, + Map<byte[],String> results, byte[] row) throws Exception { + verifyRegionResults(table, results, "pong", row); + } + + private void verifyRegionResults(RegionLocator regionLocator, + Map<byte[], String> results, String expected, byte[] row) + throws Exception { + for (Map.Entry<byte [], String> e: results.entrySet()) { + LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected + + ", result key=" + Bytes.toString(e.getKey()) + + ", value=" + e.getValue()); + } + HRegionLocation loc = regionLocator.getRegionLocation(row, true); + byte[] region = loc.getRegionInfo().getRegionName(); + assertTrue("Results should contain region " + + Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'", + results.containsKey(region)); + assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'", + expected, results.get(region)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java new file mode 100644 index 0000000..f54c632 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { + + private static final Log LOG = LogFactory + .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf1.set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { + classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; + conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); + } + + TestReplicationBase.setUpBeforeClass(); + } + + @Override + public void testSyncUpTool() throws Exception { + /** + * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: + * 'cf1' : replicated 'norep': not replicated + */ + setupReplication(); + + /** + * Prepare 16 random hfile ranges required for creating hfiles + */ + Iterator<String> randomHFileRangeListIterator = null; + Set<String> randomHFileRanges = new HashSet<String>(16); + for (int i = 0; i < 16; i++) { + randomHFileRanges.add(UUID.randomUUID().toString()); + } + List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); + Collections.sort(randomHFileRangeList); + randomHFileRangeListIterator = randomHFileRangeList.iterator(); + + /** + * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows + * into cf1, and 3 rows into norep verify correctly replicated to slave + */ + loadAndReplicateHFiles(true, randomHFileRangeListIterator); + + /** + * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load + * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and + * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave + * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step + * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not + * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to + * Slave + */ + mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); + + } + + private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) + throws Exception { + LOG.debug("mimicSyncUpAfterBulkLoad"); + utility2.shutdownMiniHBaseCluster(); + + loadAndReplicateHFiles(false, randomHFileRangeListIterator); + + int rowCount_ht1Source = utility1.countRows(ht1Source); + assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, + rowCount_ht1Source); + + int rowCount_ht2Source = utility1.countRows(ht2Source); + assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, + rowCount_ht2Source); + + utility1.shutdownMiniHBaseCluster(); + utility2.restartHBaseCluster(1); + + Thread.sleep(SLEEP_TIME); + + // Before sync up + int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + + // Run sync up tool + syncUp(utility1); + + // After syun up + for (int i = 0; i < NB_RETRIES; i++) { + syncUp(utility1); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + if (i == NB_RETRIES - 1) { + if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { + // syncUP still failed. Let's look at the source in case anything wrong there + utility1.restartHBaseCluster(1); + rowCount_ht1Source = utility1.countRows(ht1Source); + LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); + rowCount_ht2Source = utility1.countRows(ht2Source); + LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); + } + assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, + rowCount_ht2TargetAtPeer1); + } + if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { + LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); + break; + } else { + LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + + rowCount_ht2TargetAtPeer1); + } + Thread.sleep(SLEEP_TIME); + } + } + + private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, + Iterator<String> randomHFileRangeListIterator) throws Exception { + LOG.debug("loadAndReplicateHFiles"); + + // Load 100 + 3 hfiles to t1_syncup. + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, + 100); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, + hfileRanges, 3); + + // Load 200 + 3 hfiles to t2_syncup. + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, + 200); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, + hfileRanges, 3); + + if (verifyReplicationOnSlave) { + // ensure replication completed + wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, + "t1_syncup has 103 rows on source, and 100 on slave1"); + + wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, + "t2_syncup has 203 rows on source, and 200 on slave1"); + } + } + + private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, + Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { + Path dir = utility1.getDataTestDirOnTestFS(testName); + FileSystem fs = utility1.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), fam, row, from, to, numOfRows); + } + + final TableName tableName = source.getName(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + } + + private void wait(Table target, int expectedCount, String msg) throws IOException, + InterruptedException { + for (int i = 0; i < NB_RETRIES; i++) { + int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); + if (i == NB_RETRIES - 1) { + assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); + } + if (expectedCount == rowCount_ht2TargetAtPeer1) { + break; + } + Thread.sleep(SLEEP_TIME); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/README.txt ---------------------------------------------------------------------- diff --git a/hbase-examples/README.txt b/hbase-examples/README.txt index 6578bb4..78051a6 100644 --- a/hbase-examples/README.txt +++ b/hbase-examples/README.txt @@ -62,3 +62,7 @@ Example code. 2. Execute {make}. 3. Execute {./DemoClient}. +Also includes example coprocessor endpoint examples. The protobuf files are at src/main/protobuf. +See hbase-protocol README.txt for how to generate the example RowCountService Coprocessor +Endpoint and Aggregator examples. + http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml index 22afa4db..2238857 100644 --- a/hbase-examples/pom.xml +++ b/hbase-examples/pom.xml @@ -169,6 +169,46 @@ <surefire.skipSecondPart>true</surefire.skipSecondPart> </properties> </profile> + <profile> + <id>compile-protobuf</id> + <activation> + <property> + <name>compile-protobuf</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <phase>generate-sources</phase> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <imports> + <param>${basedir}/src/main/protobuf</param> + </imports> + <source> + <directory>${basedir}/src/main/protobuf</directory> + <!-- Unfortunately, Hadoop plugin does not support *.proto. + We have to individually list every proto file here --> + <includes> + <include>Examples.proto</include> + </includes> + </source> + <!--<output>${project.build.directory}/generated-sources/java</output>--> + <output>${basedir}/src/main/java/</output> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> <!-- Profiles for building against different hadoop versions --> <!-- There are a lot of common dependencies used here, should investigate http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index c9ab23c..7e6c290 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder; import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -171,7 +171,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor } catch (IOException ioe) { LOG.error(ioe); // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side. - ResponseConverter.setControllerException(controller, ioe); + CoprocessorRpcUtils.setControllerException(controller, ioe); } finally { if (scanner != null) { try { http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java index 4309cdc..c2387c5 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; @@ -94,7 +94,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService response = ExampleProtos.CountResponse.newBuilder() .setCount(count).build(); } catch (IOException ioe) { - ResponseConverter.setControllerException(controller, ioe); + CoprocessorRpcUtils.setControllerException(controller, ioe); } finally { if (scanner != null) { try { @@ -129,7 +129,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService response = ExampleProtos.CountResponse.newBuilder() .setCount(count).build(); } catch (IOException ioe) { - ResponseConverter.setControllerException(controller, ioe); + CoprocessorRpcUtils.setControllerException(controller, ioe); } finally { if (scanner != null) { try { http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/protobuf/Examples.proto ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/protobuf/Examples.proto b/hbase-examples/src/main/protobuf/Examples.proto new file mode 100644 index 0000000..ed9ed07 --- /dev/null +++ b/hbase-examples/src/main/protobuf/Examples.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated"; +option java_outer_classname = "ExampleProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message CountRequest { +} + +message CountResponse { + required int64 count = 1 [default = 0]; +} + +service RowCountService { + rpc getRowCount(CountRequest) + returns (CountResponse); + rpc getKeyValueCount(CountRequest) + returns (CountResponse); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java deleted file mode 100644 index 317081b..0000000 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.coprocessor.example; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.CoprocessorTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest; -import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder; -import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType; -import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse; -import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.experimental.categories.Category; - -@Category({CoprocessorTests.class, MediumTests.class}) -public class TestBulkDeleteProtocol { - private static final byte[] FAMILY1 = Bytes.toBytes("cf1"); - private static final byte[] FAMILY2 = Bytes.toBytes("cf2"); - private static final byte[] QUALIFIER1 = Bytes.toBytes("c1"); - private static final byte[] QUALIFIER2 = Bytes.toBytes("c2"); - private static final byte[] QUALIFIER3 = Bytes.toBytes("c3"); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - // @Ignore @BeforeClass - public static void setupBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, - BulkDeleteEndpoint.class.getName()); - TEST_UTIL.startMiniCluster(2); - } - - // @Ignore @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - // @Ignore @Test - public void testBulkDeleteEndpoint() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteEndpoint"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - byte[] rowkey = Bytes.toBytes(j); - puts.add(createPut(rowkey, "v1")); - } - ht.put(puts); - // Deleting all the rows. - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null); - assertEquals(100, noOfRowsDeleted); - - int rows = 0; - for (Result result : ht.getScanner(new Scan())) { - rows++; - } - assertEquals(0, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion() - throws Throwable { - TableName tableName = TableName - .valueOf("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - byte[] rowkey = Bytes.toBytes(j); - puts.add(createPut(rowkey, "v1")); - } - ht.put(puts); - // Deleting all the rows. - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null); - assertEquals(100, noOfRowsDeleted); - - int rows = 0; - for (Result result : ht.getScanner(new Scan())) { - rows++; - } - assertEquals(0, rows); - ht.close(); - } - - private long invokeBulkDeleteProtocol(TableName tableName, final Scan scan, final int rowBatchSize, - final DeleteType deleteType, final Long timeStamp) throws Throwable { - Table ht = TEST_UTIL.getConnection().getTable(tableName); - long noOfDeletedRows = 0L; - Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = - new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<BulkDeleteResponse> rpcCallback = - new BlockingRpcCallback<BulkDeleteResponse>(); - - public BulkDeleteResponse call(BulkDeleteService service) throws IOException { - Builder builder = BulkDeleteRequest.newBuilder(); - builder.setScan(ProtobufUtil.toScan(scan)); - builder.setDeleteType(deleteType); - builder.setRowBatchSize(rowBatchSize); - if (timeStamp != null) { - builder.setTimestamp(timeStamp); - } - service.delete(controller, builder.build(), rpcCallback); - return rpcCallback.get(); - } - }; - Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan - .getStartRow(), scan.getStopRow(), callable); - for (BulkDeleteResponse response : result.values()) { - noOfDeletedRows += response.getRowsDeleted(); - } - ht.close(); - return noOfDeletedRows; - } - - // @Ignore @Test - public void testBulkDeleteWithConditionBasedDelete() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteWithConditionBasedDelete"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - byte[] rowkey = Bytes.toBytes(j); - String value = (j % 10 == 0) ? "v1" : "v2"; - puts.add(createPut(rowkey, value)); - } - ht.put(puts); - Scan scan = new Scan(); - FilterList fl = new FilterList(Operator.MUST_PASS_ALL); - SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3, - CompareOp.EQUAL, Bytes.toBytes("v1")); - // fl.addFilter(new FirstKeyOnlyFilter()); - fl.addFilter(scvf); - scan.setFilter(fl); - // Deleting all the rows where cf1:c1=v1 - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null); - assertEquals(10, noOfRowsDeleted); - - int rows = 0; - for (Result result : ht.getScanner(new Scan())) { - rows++; - } - assertEquals(90, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteColumn() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteColumn"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - byte[] rowkey = Bytes.toBytes(j); - String value = (j % 10 == 0) ? "v1" : "v2"; - puts.add(createPut(rowkey, value)); - } - ht.put(puts); - Scan scan = new Scan(); - scan.addColumn(FAMILY1, QUALIFIER2); - // Delete the column cf1:col2 - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null); - assertEquals(100, noOfRowsDeleted); - - int rows = 0; - for (Result result : ht.getScanner(new Scan())) { - assertEquals(2, result.getFamilyMap(FAMILY1).size()); - assertTrue(result.getColumnCells(FAMILY1, QUALIFIER2).isEmpty()); - assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER1).size()); - assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER3).size()); - rows++; - } - assertEquals(100, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteFamily() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteFamily"); - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(FAMILY1)); - htd.addFamily(new HColumnDescriptor(FAMILY2)); - TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); - Table ht = TEST_UTIL.getConnection().getTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - Put put = new Put(Bytes.toBytes(j)); - put.addColumn(FAMILY1, QUALIFIER1, "v1".getBytes()); - put.addColumn(FAMILY2, QUALIFIER2, "v2".getBytes()); - puts.add(put); - } - ht.put(puts); - Scan scan = new Scan(); - scan.addFamily(FAMILY1); - // Delete the column family cf1 - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null); - assertEquals(100, noOfRowsDeleted); - int rows = 0; - for (Result result : ht.getScanner(new Scan())) { - assertTrue(result.getFamilyMap(FAMILY1).isEmpty()); - assertEquals(1, result.getColumnCells(FAMILY2, QUALIFIER2).size()); - rows++; - } - assertEquals(100, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteColumnVersion() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteColumnVersion"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - Put put = new Put(Bytes.toBytes(j)); - byte[] value = "v1".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER2, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER3, 1234L, value); - // Latest version values - value = "v2".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, value); - put.addColumn(FAMILY1, QUALIFIER2, value); - put.addColumn(FAMILY1, QUALIFIER3, value); - put.addColumn(FAMILY1, null, value); - puts.add(put); - } - ht.put(puts); - Scan scan = new Scan(); - scan.addFamily(FAMILY1); - // Delete the latest version values of all the columns in family cf1. - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, - HConstants.LATEST_TIMESTAMP); - assertEquals(100, noOfRowsDeleted); - int rows = 0; - scan = new Scan(); - scan.setMaxVersions(); - for (Result result : ht.getScanner(scan)) { - assertEquals(3, result.getFamilyMap(FAMILY1).size()); - List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER1); - assertEquals(1, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes())); - - column = result.getColumnCells(FAMILY1, QUALIFIER2); - assertEquals(1, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes())); - - column = result.getColumnCells(FAMILY1, QUALIFIER3); - assertEquals(1, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes())); - rows++; - } - assertEquals(100, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteColumnVersionBasedOnTS"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - Put put = new Put(Bytes.toBytes(j)); - // TS = 1000L - byte[] value = "v1".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 1000L, value); - put.addColumn(FAMILY1, QUALIFIER2, 1000L, value); - put.addColumn(FAMILY1, QUALIFIER3, 1000L, value); - // TS = 1234L - value = "v2".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER2, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER3, 1234L, value); - // Latest version values - value = "v3".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, value); - put.addColumn(FAMILY1, QUALIFIER2, value); - put.addColumn(FAMILY1, QUALIFIER3, value); - puts.add(put); - } - ht.put(puts); - Scan scan = new Scan(); - scan.addColumn(FAMILY1, QUALIFIER3); - // Delete the column cf1:c3's one version at TS=1234 - long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L); - assertEquals(100, noOfRowsDeleted); - int rows = 0; - scan = new Scan(); - scan.setMaxVersions(); - for (Result result : ht.getScanner(scan)) { - assertEquals(3, result.getFamilyMap(FAMILY1).size()); - assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER1).size()); - assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER2).size()); - List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER3); - assertEquals(2, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v3".getBytes())); - assertTrue(CellUtil.matchingValue(column.get(1), "v1".getBytes())); - rows++; - } - assertEquals(100, rows); - ht.close(); - } - - // @Ignore @Test - public void testBulkDeleteWithNumberOfVersions() throws Throwable { - TableName tableName = TableName.valueOf("testBulkDeleteWithNumberOfVersions"); - Table ht = createTable(tableName); - List<Put> puts = new ArrayList<Put>(100); - for (int j = 0; j < 100; j++) { - Put put = new Put(Bytes.toBytes(j)); - // TS = 1000L - byte[] value = "v1".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 1000L, value); - put.addColumn(FAMILY1, QUALIFIER2, 1000L, value); - put.addColumn(FAMILY1, QUALIFIER3, 1000L, value); - // TS = 1234L - value = "v2".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER2, 1234L, value); - put.addColumn(FAMILY1, QUALIFIER3, 1234L, value); - // TS = 2000L - value = "v3".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, 2000L, value); - put.addColumn(FAMILY1, QUALIFIER2, 2000L, value); - put.addColumn(FAMILY1, QUALIFIER3, 2000L, value); - // Latest version values - value = "v4".getBytes(); - put.addColumn(FAMILY1, QUALIFIER1, value); - put.addColumn(FAMILY1, QUALIFIER2, value); - put.addColumn(FAMILY1, QUALIFIER3, value); - puts.add(put); - } - ht.put(puts); - - // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range - // [1000,2000) - final Scan scan = new Scan(); - scan.addColumn(FAMILY1, QUALIFIER1); - scan.addColumn(FAMILY1, QUALIFIER2); - scan.setTimeRange(1000L, 2000L); - scan.setMaxVersions(); - - long noOfDeletedRows = 0L; - long noOfVersionsDeleted = 0L; - Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = - new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<BulkDeleteResponse> rpcCallback = - new BlockingRpcCallback<BulkDeleteResponse>(); - - public BulkDeleteResponse call(BulkDeleteService service) throws IOException { - Builder builder = BulkDeleteRequest.newBuilder(); - builder.setScan(ProtobufUtil.toScan(scan)); - builder.setDeleteType(DeleteType.VERSION); - builder.setRowBatchSize(500); - service.delete(controller, builder.build(), rpcCallback); - return rpcCallback.get(); - } - }; - Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan - .getStartRow(), scan.getStopRow(), callable); - for (BulkDeleteResponse response : result.values()) { - noOfDeletedRows += response.getRowsDeleted(); - noOfVersionsDeleted += response.getVersionsDeleted(); - } - assertEquals(100, noOfDeletedRows); - assertEquals(400, noOfVersionsDeleted); - - int rows = 0; - Scan scan1 = new Scan(); - scan1.setMaxVersions(); - for (Result res : ht.getScanner(scan1)) { - assertEquals(3, res.getFamilyMap(FAMILY1).size()); - List<Cell> column = res.getColumnCells(FAMILY1, QUALIFIER1); - assertEquals(2, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes())); - assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes())); - column = res.getColumnCells(FAMILY1, QUALIFIER2); - assertEquals(2, column.size()); - assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes())); - assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes())); - assertEquals(4, res.getColumnCells(FAMILY1, QUALIFIER3).size()); - rows++; - } - assertEquals(100, rows); - ht.close(); - } - - private Table createTable(TableName tableName) throws IOException { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1); - hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here - htd.addFamily(hcd); - TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); - Table ht = TEST_UTIL.getConnection().getTable(tableName); - return ht; - } - - private Put createPut(byte[] rowkey, String value) throws IOException { - Put put = new Put(rowkey); - put.addColumn(FAMILY1, QUALIFIER1, value.getBytes()); - put.addColumn(FAMILY1, QUALIFIER2, value.getBytes()); - put.addColumn(FAMILY1, QUALIFIER3, value.getBytes()); - return put; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java deleted file mode 100644 index 1776ced..0000000 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.coprocessor.example; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.testclassification.CoprocessorTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import static junit.framework.Assert.*; - -/** - * Test case demonstrating client interactions with the {@link RowCountEndpoint} - * sample coprocessor Service implementation. - */ -@Category({CoprocessorTests.class, MediumTests.class}) -public class TestRowCountEndpoint { - private static final TableName TEST_TABLE = TableName.valueOf("testrowcounter"); - private static final byte[] TEST_FAMILY = Bytes.toBytes("f"); - private static final byte[] TEST_COLUMN = Bytes.toBytes("col"); - - private static HBaseTestingUtility TEST_UTIL = null; - private static Configuration CONF = null; - - // @Ignore @BeforeClass - public static void setupBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); - CONF = TEST_UTIL.getConfiguration(); - CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - RowCountEndpoint.class.getName()); - - TEST_UTIL.startMiniCluster(); - TEST_UTIL.createTable(TEST_TABLE, new byte[][]{TEST_FAMILY}); - } - - // @Ignore @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - // @Ignore @Test - public void testEndpoint() throws Throwable { - Table table = TEST_UTIL.getConnection().getTable(TEST_TABLE); - - // insert some test rows - for (int i=0; i<5; i++) { - byte[] iBytes = Bytes.toBytes(i); - Put p = new Put(iBytes); - p.addColumn(TEST_FAMILY, TEST_COLUMN, iBytes); - table.put(p); - } - - final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance(); - Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class, - null, null, - new Batch.Call<ExampleProtos.RowCountService,Long>() { - public Long call(ExampleProtos.RowCountService counter) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = - new BlockingRpcCallback<ExampleProtos.CountResponse>(); - counter.getRowCount(controller, request, rpcCallback); - ExampleProtos.CountResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return (response != null && response.hasCount()) ? response.getCount() : 0; - } - }); - // should be one region with results - assertEquals(1, results.size()); - Iterator<Long> iter = results.values().iterator(); - Long val = iter.next(); - assertNotNull(val); - assertEquals(5l, val.longValue()); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java deleted file mode 100644 index e97d528..0000000 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.coprocessor.example; - -import static org.junit.Assert.assertEquals; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.testclassification.CoprocessorTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.ZooKeeper; -import org.junit.experimental.categories.Category; - -@Category({CoprocessorTests.class, MediumTests.class}) -public class TestZooKeeperScanPolicyObserver { - private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] F = Bytes.toBytes("fam"); - private static final byte[] Q = Bytes.toBytes("qual"); - private static final byte[] R = Bytes.toBytes("row"); - - // @BeforeClass - public static void setUpBeforeClass() throws Exception { - System.out.println("HERE!!!!!!!!"); - // Test we can first start the ZK cluster by itself - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - ZooKeeperScanPolicyObserver.class.getName()); - TEST_UTIL.startMiniZKCluster(); - TEST_UTIL.startMiniCluster(); - } - - // @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - // @Ignore @Test - public void testScanPolicyObserver() throws Exception { - TableName tableName = - TableName.valueOf("testScanPolicyObserver"); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(F) - .setMaxVersions(10) - .setTimeToLive(1); - desc.addFamily(hcd); - TEST_UTIL.getHBaseAdmin().createTable(desc); - Table t = TEST_UTIL.getConnection().getTable(tableName); - long now = EnvironmentEdgeManager.currentTime(); - - ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "test", null); - ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper(); - ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node); - // let's say test last backup was 1h ago - // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data - zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1); - - LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000))); - - // sleep for 1s to give the ZK change a chance to reach the watcher in the observer. - // TODO: Better to wait for the data to be propagated - Thread.sleep(1000); - - long ts = now - 2000; - Put p = new Put(R); - p.addColumn(F, Q, ts, Q); - t.put(p); - p = new Put(R); - p.addColumn(F, Q, ts + 1, Q); - t.put(p); - - // these two should be expired but for the override - // (their ts was 2s in the past) - Get g = new Get(R); - g.setMaxVersions(10); - Result r = t.get(g); - // still there? - assertEquals(2, r.size()); - - TEST_UTIL.flush(tableName); - TEST_UTIL.compact(tableName, true); - - g = new Get(R); - g.setMaxVersions(10); - r = t.get(g); - // still there? - assertEquals(2, r.size()); - zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1); - LOG.debug("Set time: "+now); - - TEST_UTIL.compact(tableName, true); - - g = new Get(R); - g.setMaxVersions(10); - r = t.get(g); - // should be gone now - assertEquals(0, r.size()); - t.close(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index c83c5e4..16f1e71 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -32,11 +32,11 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java index a237805..b53d5d0 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.TestMetaWithReplicas; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -66,7 +67,7 @@ public class IntegrationTestMetaReplicas { conf.get("zookeeper.znode.metaserver", "meta-region-server")); // check that the data in the znode is parseable (this would also mean the znode exists) byte[] data = ZKUtil.getData(zkw, primaryMetaZnode); - ServerName.parseFrom(data); + ProtobufUtil.toServerName(data); waitUntilZnodeAvailable(1); waitUntilZnodeAvailable(2); } http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index f41efc7..7ce86bd 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -26,9 +26,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import java.io.IOException; import java.net.InetSocketAddress; @@ -48,9 +48,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Pair; http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-procedure/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml index 9d1ac8d..1a813b8 100644 --- a/hbase-procedure/pom.xml +++ b/hbase-procedure/pom.xml @@ -88,7 +88,7 @@ </dependency> <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-protocol</artifactId> + <artifactId>hbase-protocol-shaded</artifactId> </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 6403cfd..84328a7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -29,20 +29,18 @@ import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.ProcedureUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; -import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; -import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; /** * Base Procedure class responsible to handle the Procedure Metadata @@ -774,22 +772,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { } /** - * Helper to create the ProcedureInfo from Procedure. - */ - @InterfaceAudience.Private - public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) { - RemoteProcedureException exception = proc.hasException() ? proc.getException() : null; - return new ProcedureInfo(proc.getProcId(), proc.toStringClass(), proc.getOwner(), - ProcedureUtil.convertToProcedureState(proc.getState()), - proc.hasParent() ? proc.getParentProcId() : -1, nonceKey, - exception != null - ? new ProcedureUtil.ForeignExceptionMsg( - RemoteProcedureException.toProto(exception.getSource(), exception.getCause())) - : null, - proc.getLastUpdate(), proc.getStartTime(), proc.getResult()); - } - - /** * Helper to convert the procedure to protobuf. * Used by ProcedureStore implementations. */ @@ -833,7 +815,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { byte[] result = proc.getResult(); if (result != null) { - builder.setResult(ByteStringer.wrap(result)); + builder.setResult(ByteString.copyFrom(result)); } ByteString.Output stateStream = ByteString.newOutput();