Repository: hbase Updated Branches: refs/heads/master 7130a222c -> c03ea895c
http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java new file mode 100644 index 0000000..b39f89b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ExportProtos; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hbase.mapreduce.Export; +import org.apache.hadoop.hbase.mapreduce.Import; + +/** +* Export an HBase table. +* Writes content to sequence files up in HDFS. Use {@link Import} to read it +* back in again. +* It is implemented by the endpoint technique. +* @see Export +*/ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ExportEndpoint extends ExportProtos.ExportService + implements Coprocessor, CoprocessorService { + private static final Log LOG = LogFactory.getLog(ExportEndpoint.class); + private RegionCoprocessorEnvironment env = null; + @Override + public void start(CoprocessorEnvironment environment) throws IOException { + if (environment instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment) environment; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public Service getService() { + return this; + } + private static boolean getCompression(final ExportProtos.ExportRequest request) { + if (request.hasCompressed()) { + return request.getCompressed(); + } else { + return false; + } + } + private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) { + if (!request.hasCompressType()) { + return null; + } + return SequenceFile.CompressionType.valueOf(request.getCompressType()); + } + private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) { + if (!request.hasCompressCodec()) { + return null; + } + try { + Class<? extends CompressionCodec> codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class); + return ReflectionUtils.newInstance(codecClass, conf); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + + request.getCompressCodec()+ " was not found.", e); + } + } + private static SequenceFile.Writer.Option getOutputPath(final Configuration conf, + final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException { + Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName()); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(file)) { + throw new IOException(file + " exists"); + } + return SequenceFile.Writer.file(file); + } + private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf, + final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException { + List<SequenceFile.Writer.Option> rval = new LinkedList<>(); + rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class)); + rval.add(SequenceFile.Writer.valueClass(Result.class)); + rval.add(getOutputPath(conf, info, request)); + boolean compressed = getCompression(request); + if (compressed) { + SequenceFile.CompressionType type = getCompressionType(request); + if (type != null) { + CompressionCodec codec = getCompressionCodec(conf, request); + rval.add(SequenceFile.Writer.compression(type, codec)); + } + } + return rval; + } + private Scan validateKey(final HRegionInfo region, final Scan scan) { + byte[] regionStartKey = region.getStartKey(); + byte[] originStartKey = scan.getStartRow(); + if (originStartKey == null + || Bytes.compareTo(originStartKey, regionStartKey) < 0) { + scan.setStartRow(regionStartKey); + } + byte[] regionEndKey = region.getEndKey(); + byte[] originEndKey = scan.getStopRow(); + if (originEndKey == null + || Bytes.compareTo(originEndKey, regionEndKey) > 0) { + scan.setStartRow(regionEndKey); + } + return scan; + } + @Override + public void export(RpcController controller, ExportProtos.ExportRequest request, + RpcCallback<ExportProtos.ExportResponse> done) { + Region region = env.getRegion(); + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName()); + try { + Scan scan = validateKey(region.getRegionInfo(), ProtobufUtil.toScan(request.getScan())); + ExportProtos.ExportResponse response = processData(conf, region, scan, + getWriterOptions(conf, region.getRegionInfo(), request)); + done.run(response); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + LOG.error(e); + } + } + private static ExportProtos.ExportResponse processData(final Configuration conf, + final Region region, final Scan scan, final List<SequenceFile.Writer.Option> opts) throws IOException { + region.startRegionOperation(); + try (SequenceFile.Writer out = SequenceFile.createWriter(conf, + opts.toArray(new SequenceFile.Writer.Option[opts.size()])); + RegionScanner scanner = region.getScanner(scan)) { + ImmutableBytesWritable key = new ImmutableBytesWritable(); + long rowCount = 0; + long cellCount = 0; + List<Cell> buf = new ArrayList<>(); + boolean hasMore; + do { + hasMore = scanner.nextRaw(buf); + if (!buf.isEmpty()) { + Cell firstCell = buf.get(0); + for (Cell cell : buf) { + if (Bytes.compareTo( + firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(), + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) { + throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"); + } + } + key.set(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength()); + out.append(key, Result.create(buf)); + ++rowCount; + cellCount += buf.size(); + buf.clear(); + } + } while (hasMore); + return ExportProtos.ExportResponse.newBuilder() + .setRowCount(rowCount) + .setCellCount(cellCount) + .build(); + } finally { + region.closeRegionOperation(); + } + } + public static void main(String[] args) throws IOException, Throwable { + run(HBaseConfiguration.create(), args); + } + public static Map<byte[], ExportProtos.ExportResponse> run(final Configuration conf, + final String[] args) throws IOException, Throwable { + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (!Export.checkArguments(otherArgs)) { + Export.usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + TableName tableName = TableName.valueOf(otherArgs[0]); + FileSystem fs = FileSystem.get(conf); + String dir = otherArgs[1]; + checkDir(fs, dir); + Scan scan = Export.getConfiguredScanForJob(conf, otherArgs); + final ExportProtos.ExportRequest request = getConfiguredRequestForJob(conf, otherArgs, scan); + try (Connection con = ConnectionFactory.createConnection(conf); + Table table = con.getTable(tableName)) { + return table.coprocessorService(ExportProtos.ExportService.class, + scan.getStartRow(), + scan.getStopRow(), new Batch.Call<ExportProtos.ExportService, ExportProtos.ExportResponse>() { + @Override + public ExportProtos.ExportResponse call(ExportProtos.ExportService service) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback = new BlockingRpcCallback<>(); + service.export(controller, request, rpcCallback); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); + } catch (Throwable e) { + fs.delete(new Path(dir), true); + throw e; + } + } + private static void checkDir(final FileSystem fs, final String path) throws IOException { + Path dir = fs.makeQualified(new Path(path)); + if (fs.exists(dir)) { + throw new RuntimeException("The " + path + " exists"); + } + fs.mkdirs(dir); + fs.setPermission(dir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } + private static ExportProtos.ExportRequest getConfiguredRequestForJob(Configuration conf, + String[] args, final Scan scan) throws IOException { + String dir = args[1]; + boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, true); + String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE, + SequenceFile.CompressionType.RECORD.toString()); + String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC, + DefaultCodec.class.getName()); + LOG.info("compressed=" + compressed + + ", compression type=" + compressionType + + ", compression codec=" + compressionCodec); + return ExportProtos.ExportRequest.newBuilder() + .setScan(ProtobufUtil.toScan(scan)) + .setOutputPath(dir) + .setCompressed(compressed) + .setCompressCodec(compressionCodec) + .setCompressType(compressionType) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index 56d229a..4ccbe30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -86,7 +86,7 @@ public class Export extends Configured implements Tool { return job; } - private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { + public static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { Scan s = new Scan(); // Optional arguments. // Set Scan Versions @@ -150,7 +150,7 @@ public class Export extends Configured implements Tool { /* * @param errorMsg Error message. Can be null. */ - private static void usage(final String errorMsg) { + public static void usage(final String errorMsg) { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } @@ -176,11 +176,12 @@ public class Export extends Configured implements Tool { System.err.println("For tables with very wide rows consider setting the batch size as below:\n" + " -D" + EXPORT_BATCHING + "=10"); } - - + public static boolean checkArguments(final String[] args) { + return args.length >= 2; + } @Override public int run(String[] args) throws Exception { - if (args.length < 2) { + if (!checkArguments(args)) { usage("Wrong number of arguments: " + args.length); return -1; } http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 50146fd..7c74913 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -17,13 +17,7 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; @@ -34,11 +28,10 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.NavigableMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -59,6 +52,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ExportEndpoint; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; @@ -72,7 +67,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.AfterClass; @@ -85,6 +83,12 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; /** * Tests the table import and table export MR job functionality @@ -106,14 +110,40 @@ public class TestImportExport { private static final String EXPORT_BATCH_SIZE = "100"; private static long now = System.currentTimeMillis(); - + private static final Exporter EXPORTER_MR = new Exporter() { + @Override + public boolean runExport(String[] args) throws Throwable { + // need to make a copy of the configuration because to make sure different temp dirs are used. + int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); + return status == 0; + } + @Override + public String toString() { + return "MR-based export"; + } + }; + private static final Exporter EXPORTER_ENDPOINT = new Exporter() { + @Override + public boolean runExport(String[] args) throws Throwable { + ExportEndpoint.run(new Configuration(UTIL.getConfiguration()), args); + return true; + } + @Override + public String toString() { + return "Endpoint-based export"; + } + }; + private static final List<Exporter> EXPORTERS = Arrays.asList(EXPORTER_MR, EXPORTER_ENDPOINT); @BeforeClass public static void beforeClass() throws Exception { // Up the handlers; this test needs more than usual. + UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ExportEndpoint.class.getName()); UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); UTIL.startMiniCluster(); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); FQ_OUTPUT_DIR = - new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); + new Path(OUTPUT_DIR).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(); } @AfterClass @@ -131,47 +161,108 @@ public class TestImportExport { @Before @After - public void cleanup() throws Exception { + public void cleanup() throws IOException { + deleteOutput(); + } + + private static void deleteOutput() throws IOException { FileSystem fs = FileSystem.get(UTIL.getConfiguration()); fs.delete(new Path(OUTPUT_DIR), true); } /** - * Runs an export job with the specified command line args + * Runs an import job with the specified command line args * @param args * @return true if job completed successfully - * @throws IOException - * @throws InterruptedException - * @throws ClassNotFoundException + * @throws Exception */ - boolean runExport(String[] args) throws Exception { + boolean runImport(String[] args) throws Exception { // need to make a copy of the configuration because to make sure different temp dirs are used. - int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); + int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); return status == 0; } - /** - * Runs an import job with the specified command line args - * @param args - * @return true if job completed successfully + * Test the writer's options. * @throws IOException - * @throws InterruptedException - * @throws ClassNotFoundException */ - boolean runImport(String[] args) throws Exception { - // need to make a copy of the configuration because to make sure different temp dirs are used. - int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); - return status == 0; + @Test + public void testOutputFileFormat() throws IOException, Throwable { + String exportTable = "testOutputFileFormat"; + for (Exporter exporter : EXPORTERS) { + testOutputFileFormat(exportTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + deleteOutput(); + } + } + /** + * Test the writer's options. + * @throws IOException + */ + public void testOutputFileFormat(final String exportTable, final Exporter exporter) throws IOException, Throwable { + String codec = BZip2Codec.class.getName(); + String type = SequenceFile.CompressionType.RECORD.name(); + try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) { + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILYA, QUAL, now, QUAL); + t.put(p); + p = new Put(ROW3); + p.addColumn(FAMILYA, QUAL, now, QUAL); + t.put(p); + } + //use compress + String[] args = new String[] { + // Only export row1 & row2. + "-D" + FileOutputFormat.COMPRESS + "=true", + "-D" + FileOutputFormat.COMPRESS_CODEC + "=" + codec, + "-D" + FileOutputFormat.COMPRESS_TYPE + "=" + type, + exportTable, + FQ_OUTPUT_DIR + }; + assertTrue(exporter.toString(), exporter.runExport(args)); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + List<FileStatus> files = Arrays.asList(fs.listStatus(new Path(FQ_OUTPUT_DIR))); + assertEquals(exporter.toString(), false, files.isEmpty()); + Configuration copy = new Configuration(UTIL.getConfiguration()); + //need to make a copy of the configuration because to make sure the Exporter has set the "io.serializations" + copy.setStrings("io.serializations", copy.get("io.serializations"), + ResultSerialization.class.getName()); + for (FileStatus file : files) { + Path path = file.getPath(); + //skips the MR meta output + if (path.getName().equals("_SUCCESS")) { + continue; + } + try (SequenceFile.Reader reader = new SequenceFile.Reader( + copy, SequenceFile.Reader.file(file.getPath()))) { + assertEquals(exporter.toString(), reader.getCompressionCodec().getClass().getName(), codec); + assertEquals(exporter.toString(), reader.getCompressionType().name(), type); + } + } } - /** * Test simple replication case with column mapping - * @throws Exception + * @throws IOException */ @Test - public void testSimpleCase() throws Exception { - String EXPORT_TABLE = "exportSimpleCase"; - try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) { + public void testSimpleCase() throws IOException, Throwable { + String exportTable = "exportSimpleCase"; + String importTable = "importTableSimpleCase"; + for (Exporter exporter : EXPORTERS) { + testSimpleCase(exportTable, importTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + UTIL.deleteTable(TableName.valueOf(importTable)); + deleteOutput(); + } + } + /** + * Test simple replication case with column mapping. + */ + public void testSimpleCase(final String exportTable, final String importTable, + final Exporter exporter) throws IOException, Throwable { + try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) { Put p = new Put(ROW1); p.addColumn(FAMILYA, QUAL, now, QUAL); p.addColumn(FAMILYA, QUAL, now + 1, QUAL); @@ -189,53 +280,53 @@ public class TestImportExport { t.put(p); } - String[] args = new String[] { - // Only export row1 & row2. - "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", - "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", - EXPORT_TABLE, - FQ_OUTPUT_DIR, - "1000", // max number of key versions per key to export + String[] args = new String[] { + // Only export row1 & row2. + "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", + "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", + exportTable, + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(exporter.toString(), exporter.runExport(args)); + + try (Table t = UTIL.createTable(TableName.valueOf(importTable), FAMILYB, 3);) { + args = new String[] { + "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, + importTable, + FQ_OUTPUT_DIR }; - assertTrue(runExport(args)); - - String IMPORT_TABLE = "importTableSimpleCase"; - try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { - args = new String[] { - "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, - IMPORT_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - Get g = new Get(ROW1); - g.setMaxVersions(); - Result r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW2); - g.setMaxVersions(); - r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW3); - r = t.get(g); - assertEquals(0, r.size()); - } + assertTrue(exporter.toString(), runImport(args)); + + Get g = new Get(ROW1); + g.setMaxVersions(); + Result r = t.get(g); + assertEquals(exporter.toString(), 3, r.size()); + g = new Get(ROW2); + g.setMaxVersions(); + r = t.get(g); + assertEquals(exporter.toString(), 3, r.size()); + g = new Get(ROW3); + r = t.get(g); + assertEquals(exporter.toString(), 0, r.size()); + } } - /** * Test export hbase:meta table * - * @throws Exception + * @throws IOException */ @Test - public void testMetaExport() throws Exception { - String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString(); - String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; - assertTrue(runExport(args)); + public void testMetaExport() throws IOException, Throwable { + String exportTable = TableName.META_TABLE_NAME.getNameAsString(); + String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1", "0", "0" }; + for (Exporter exporter : EXPORTERS) { + assertTrue(exporter.toString(), exporter.runExport(args)); + deleteOutput(); + } } - /** - * Test import data from 0.94 exported file + * Test import data from 0.94 exported file. * @throws Exception */ @Test @@ -252,11 +343,11 @@ public class TestImportExport { Path importPath = new Path(f.toURI()); FileSystem fs = FileSystem.get(UTIL.getConfiguration()); fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); - String IMPORT_TABLE = name; - try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { + String importTable = name; + try (Table t = UTIL.createTable(TableName.valueOf(importTable), Bytes.toBytes("f1"), 3);) { String[] args = new String[] { "-Dhbase.import.version=0.94" , - IMPORT_TABLE, FQ_OUTPUT_DIR + importTable, FQ_OUTPUT_DIR }; assertTrue(runImport(args)); /* exportedTableIn94Format contains 5 rows @@ -270,18 +361,28 @@ public class TestImportExport { assertEquals(5, UTIL.countRows(t)); } } - /** * Test export scanner batching + * @throws java.lang.IOException */ @Test - public void testExportScannerBatching() throws Exception { - String BATCH_TABLE = "exportWithBatch"; - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE)); + public void testExportScannerBatching() throws IOException, Throwable { + String exportTable = "exportWithBatch"; + for (Exporter exporter : EXPORTERS) { + testExportScannerBatching(exportTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + deleteOutput(); + } + } + /** + * Test export scanner batching. + */ + public void testExportScannerBatching(final String exportTable, final Exporter exporter) throws IOException, Throwable { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable)); desc.addFamily(new HColumnDescriptor(FAMILYA) .setMaxVersions(1) ); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { Put p = new Put(ROW1); @@ -294,25 +395,34 @@ public class TestImportExport { String[] args = new String[] { "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. - BATCH_TABLE, + exportTable, FQ_OUTPUT_DIR }; - assertTrue(runExport(args)); + assertTrue(exporter.toString(), exporter.runExport(args)); FileSystem fs = FileSystem.get(UTIL.getConfiguration()); fs.delete(new Path(FQ_OUTPUT_DIR), true); } } - @Test - public void testWithDeletes() throws Exception { - String EXPORT_TABLE = "exportWithDeletes"; - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE)); + public void testWithDeletes() throws IOException, Throwable { + String exportTable = "exportWithDeletes"; + String importTable = "importWithDeletes"; + for (Exporter exporter : EXPORTERS) { + testWithDeletes(exportTable, importTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + UTIL.deleteTable(TableName.valueOf(importTable)); + deleteOutput(); + } + } + public void testWithDeletes(final String exportTable, final String importTable, + final Exporter exporter) throws IOException, Throwable { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable)); desc.addFamily(new HColumnDescriptor(FAMILYA) .setMaxVersions(5) .setKeepDeletedCells(KeepDeletedCells.TRUE) ); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { Put p = new Put(ROW1); @@ -332,25 +442,23 @@ public class TestImportExport { String[] args = new String[] { "-D" + Export.RAW_SCAN + "=true", - EXPORT_TABLE, + exportTable, FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export }; - assertTrue(runExport(args)); - - String IMPORT_TABLE = "importWithDeletes"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); + assertTrue(exporter.toString(), exporter.runExport(args)); + desc = new HTableDescriptor(TableName.valueOf(importTable)); desc.addFamily(new HColumnDescriptor(FAMILYA) .setMaxVersions(5) .setKeepDeletedCells(KeepDeletedCells.TRUE) ); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { args = new String[] { - IMPORT_TABLE, - FQ_OUTPUT_DIR + importTable, + FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); + assertTrue(exporter.toString(), runImport(args)); Scan s = new Scan(); s.setMaxVersions(); @@ -358,29 +466,36 @@ public class TestImportExport { ResultScanner scanner = t.getScanner(s); Result r = scanner.next(); Cell[] res = r.rawCells(); - assertTrue(CellUtil.isDeleteFamily(res[0])); - assertEquals(now+4, res[1].getTimestamp()); - assertEquals(now+3, res[2].getTimestamp()); - assertTrue(CellUtil.isDelete(res[3])); - assertEquals(now+2, res[4].getTimestamp()); - assertEquals(now+1, res[5].getTimestamp()); - assertEquals(now, res[6].getTimestamp()); + assertTrue(exporter.toString(), CellUtil.isDeleteFamily(res[0])); + assertEquals(exporter.toString(), now+4, res[1].getTimestamp()); + assertEquals(exporter.toString(), now+3, res[2].getTimestamp()); + assertTrue(exporter.toString(), CellUtil.isDelete(res[3])); + assertEquals(exporter.toString(), now+2, res[4].getTimestamp()); + assertEquals(exporter.toString(), now+1, res[5].getTimestamp()); + assertEquals(exporter.toString(), now, res[6].getTimestamp()); } } - - @Test - public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception { - TableName EXPORT_TABLE = - TableName.valueOf("exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily"); - HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE); + public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws IOException, Throwable { + String exportTable = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily"; + String importTable = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily"; + for (Exporter exporter : EXPORTERS) { + testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(exportTable, importTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + UTIL.deleteTable(TableName.valueOf(importTable)); + deleteOutput(); + } + } + public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(final String exportTable, final String importTable, + final Exporter exporter) throws IOException, Throwable { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable)); desc.addFamily(new HColumnDescriptor(FAMILYA) .setMaxVersions(5) .setKeepDeletedCells(KeepDeletedCells.TRUE) ); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); - Table exportT = UTIL.getConnection().getTable(EXPORT_TABLE); + Table exportT = UTIL.getConnection().getTable(desc.getTableName()); //Add first version of QUAL Put p = new Put(ROW1); @@ -402,26 +517,24 @@ public class TestImportExport { String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", EXPORT_TABLE.getNameAsString(), + "-D" + Export.RAW_SCAN + "=true", exportTable, FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export }; - assertTrue(runExport(args)); - - String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); + assertTrue(exporter.toString(), exporter.runExport(args)); + desc = new HTableDescriptor(TableName.valueOf(importTable)); desc.addFamily(new HColumnDescriptor(FAMILYA) .setMaxVersions(5) .setKeepDeletedCells(KeepDeletedCells.TRUE) ); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); - Table importT = UTIL.getConnection().getTable(TableName.valueOf(IMPORT_TABLE)); + Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); args = new String[] { - IMPORT_TABLE, + importTable, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); + assertTrue(exporter.toString(), runImport(args)); Scan s = new Scan(); s.setMaxVersions(); @@ -434,26 +547,39 @@ public class TestImportExport { Result exportedTResult = exportedTScanner.next(); try { Result.compareResults(exportedTResult, importedTResult); - } catch (Exception e) { + } catch (IOException e) { fail("Original and imported tables data comparision failed with error:"+e.getMessage()); } finally { exportT.close(); importT.close(); } } - /** * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, * attempt with invalid values. */ @Test - public void testWithFilter() throws Exception { + public void testWithFilter() throws IOException, Throwable { + String exportTable = "exportSimpleCase_ImportWithFilter"; + String importTable = "importWithFilter"; + for (Exporter exporter : EXPORTERS) { + testWithFilter(exportTable, importTable, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + UTIL.deleteTable(TableName.valueOf(importTable)); + deleteOutput(); + } + } + /** + * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, + * attempt with invalid values. + */ + public void testWithFilter(final String exportTable, final String importTable, + final Exporter exporter) throws IOException, Throwable { // Create simple table to export - String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter"; - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE)); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable)); desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); - UTIL.getHBaseAdmin().createTable(desc); - Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); + UTIL.getAdmin().createTable(desc); + Table exportT = UTIL.getConnection().getTable(desc.getTableName()); Put p1 = new Put(ROW1); p1.addColumn(FAMILYA, QUAL, now, QUAL); @@ -466,43 +592,42 @@ public class TestImportExport { Put p2 = new Put(ROW2); p2.addColumn(FAMILYA, QUAL, now, QUAL); - exportTable.put(Arrays.asList(p1, p2)); + exportT.put(Arrays.asList(p1, p2)); // Export the simple table - String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; - assertTrue(runExport(args)); + String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000" }; + assertTrue(exporter.toString(), exporter.runExport(args)); // Import to a new table - String IMPORT_TABLE = "importWithFilter"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); + desc = new HTableDescriptor(TableName.valueOf(importTable)); desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); - UTIL.getHBaseAdmin().createTable(desc); + UTIL.getAdmin().createTable(desc); - Table importTable = UTIL.getConnection().getTable(desc.getTableName()); + Table importT = UTIL.getConnection().getTable(desc.getTableName()); args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), importTable, FQ_OUTPUT_DIR, "1000" }; - assertTrue(runImport(args)); + assertTrue(exporter.toString(), runImport(args)); // get the count of the source table for that time range PrefixFilter filter = new PrefixFilter(ROW1); - int count = getCount(exportTable, filter); + int count = getCount(exportT, filter); - Assert.assertEquals("Unexpected row count between export and import tables", count, - getCount(importTable, null)); + Assert.assertEquals("Unexpected row count between export(" + exporter.toString() + ") and import tables", count, + getCount(importT, null)); // and then test that a broken command doesn't bork everything - easier here because we don't // need to re-run the export job args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE, + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", exportTable, FQ_OUTPUT_DIR, "1000" }; assertFalse(runImport(args)); // cleanup - exportTable.close(); - importTable.close(); + exportT.close(); + importT.close(); } /** @@ -637,40 +762,50 @@ public class TestImportExport { Import.addFilterAndArguments(configuration, FilterBase.class, args); assertEquals("org.apache.hadoop.hbase.filter.FilterBase", - configuration.get(Import.FILTER_CLASS_CONF_KEY)); + configuration.get(Import.FILTER_CLASS_CONF_KEY)); assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); } - @Test - public void testDurability() throws Exception { + public void testDurability() throws IOException, Throwable { + String exportTable = "exporttestDurability"; + String importTable = "importTestDurability1"; + String importTableV2 = "importTestDurability2"; + for (Exporter exporter : EXPORTERS) { + testDurability(exportTable, importTable, importTableV2, exporter); + UTIL.deleteTable(TableName.valueOf(exportTable)); + UTIL.deleteTable(TableName.valueOf(importTable)); + UTIL.deleteTable(TableName.valueOf(importTableV2)); + deleteOutput(); + } + } + public void testDurability(final String exportTable, final String importTable, final String importTable2, + final Exporter exporter) throws IOException, Throwable { // Create an export table. - String exportTableName = "exporttestDurability"; - try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { + try (Table exportT = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) { // Insert some data Put put = new Put(ROW1); put.addColumn(FAMILYA, QUAL, now, QUAL); put.addColumn(FAMILYA, QUAL, now + 1, QUAL); put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); + exportT.put(put); put = new Put(ROW2); put.addColumn(FAMILYA, QUAL, now, QUAL); put.addColumn(FAMILYA, QUAL, now + 1, QUAL); put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); + exportT.put(put); // Run the export - String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; - assertTrue(runExport(args)); + String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000"}; + assertTrue(exporter.toString(), exporter.runExport(args)); // Create the table for import - String importTableName = "importTestDurability1"; - Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + Table importT = UTIL.createTable(TableName.valueOf(importTable), FAMILYA, 3); // Register the wal listener for the import table HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + .getOnlineRegions(importT.getName()).get(0).getRegionInfo(); TableWALActionListener walListener = new TableWALActionListener(region); WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); wal.registerWALActionsListener(walListener); @@ -678,27 +813,26 @@ public class TestImportExport { // Run the import with SKIP_WAL args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), - importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); + importTable, FQ_OUTPUT_DIR }; + assertTrue(exporter.toString(), runImport(args)); //Assert that the wal is not visisted - assertTrue(!walListener.isWALVisited()); + assertTrue(exporter.toString(), !walListener.isWALVisited()); //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); + assertTrue(exporter.toString(), getCount(importT, null) == 2); // Run the import with the default durability option - importTableName = "importTestDurability2"; - importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + importT = UTIL.createTable(TableName.valueOf(importTable2), FAMILYA, 3); region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + .getOnlineRegions(importT.getName()).get(0).getRegionInfo(); wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); walListener = new TableWALActionListener(region); wal.registerWALActionsListener(walListener); - args = new String[] { importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); + args = new String[] { importTable2, FQ_OUTPUT_DIR }; + assertTrue(exporter.toString(), runImport(args)); //Assert that the wal is visisted - assertTrue(walListener.isWALVisited()); + assertTrue(exporter.toString(), walListener.isWALVisited()); //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); + assertTrue(exporter.toString(), getCount(importT, null) == 2); } } @@ -727,4 +861,7 @@ public class TestImportExport { return isVisited; } } + public interface Exporter { + boolean runExport(final String[] args) throws Throwable; + } }