Repository: hbase
Updated Branches:
  refs/heads/master 7130a222c -> c03ea895c


http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java
new file mode 100644
index 0000000..b39f89b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ExportEndpoint.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hbase.mapreduce.Export;
+import org.apache.hadoop.hbase.mapreduce.Import;
+
+/**
+* Export an HBase table.
+* Writes content to sequence files up in HDFS.  Use {@link Import} to read it
+* back in again.
+* It is implemented by the endpoint technique.
+* @see Export
+*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ExportEndpoint extends ExportProtos.ExportService
+  implements Coprocessor, CoprocessorService {
+  private static final Log LOG = LogFactory.getLog(ExportEndpoint.class);
+  private RegionCoprocessorEnvironment env = null;
+  @Override
+  public void start(CoprocessorEnvironment environment) throws IOException {
+    if (environment instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment) environment;
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
+    }
+  }
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+  private static boolean getCompression(final ExportProtos.ExportRequest 
request) {
+    if (request.hasCompressed()) {
+      return request.getCompressed();
+    } else {
+      return false;
+    }
+  }
+  private static SequenceFile.CompressionType getCompressionType(final 
ExportProtos.ExportRequest request) {
+    if (!request.hasCompressType()) {
+      return null;
+    }
+    return SequenceFile.CompressionType.valueOf(request.getCompressType());
+  }
+  private static CompressionCodec getCompressionCodec(final Configuration 
conf, final ExportProtos.ExportRequest request) {
+    if (!request.hasCompressCodec()) {
+      return null;
+    }
+    try {
+      Class<? extends CompressionCodec> codecClass = 
conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
+      return ReflectionUtils.newInstance(codecClass, conf);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Compression codec "
+              + request.getCompressCodec()+ " was not found.", e);
+    }
+  }
+  private static SequenceFile.Writer.Option getOutputPath(final Configuration 
conf,
+          final HRegionInfo info, final ExportProtos.ExportRequest request) 
throws IOException {
+    Path file = new Path(request.getOutputPath(), "export-" + 
info.getEncodedName());
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(file)) {
+      throw new IOException(file + " exists");
+    }
+    return SequenceFile.Writer.file(file);
+  }
+  private static List<SequenceFile.Writer.Option> getWriterOptions(final 
Configuration conf,
+          final HRegionInfo info, final ExportProtos.ExportRequest request) 
throws IOException {
+    List<SequenceFile.Writer.Option> rval = new LinkedList<>();
+    rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
+    rval.add(SequenceFile.Writer.valueClass(Result.class));
+    rval.add(getOutputPath(conf, info, request));
+    boolean compressed = getCompression(request);
+    if (compressed) {
+      SequenceFile.CompressionType type = getCompressionType(request);
+      if (type != null) {
+        CompressionCodec codec = getCompressionCodec(conf, request);
+        rval.add(SequenceFile.Writer.compression(type, codec));
+      }
+    }
+    return rval;
+  }
+  private Scan validateKey(final HRegionInfo region, final Scan scan) {
+    byte[] regionStartKey = region.getStartKey();
+    byte[] originStartKey = scan.getStartRow();
+    if (originStartKey == null
+      || Bytes.compareTo(originStartKey, regionStartKey) < 0) {
+      scan.setStartRow(regionStartKey);
+    }
+    byte[] regionEndKey = region.getEndKey();
+    byte[] originEndKey = scan.getStopRow();
+    if (originEndKey == null
+      || Bytes.compareTo(originEndKey, regionEndKey) > 0) {
+      scan.setStartRow(regionEndKey);
+    }
+    return scan;
+  }
+  @Override
+  public void export(RpcController controller, ExportProtos.ExportRequest 
request,
+    RpcCallback<ExportProtos.ExportResponse> done) {
+    Region region = env.getRegion();
+    Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+    conf.setStrings("io.serializations", conf.get("io.serializations"), 
ResultSerialization.class.getName());
+    try {
+      Scan scan = validateKey(region.getRegionInfo(), 
ProtobufUtil.toScan(request.getScan()));
+      ExportProtos.ExportResponse response = processData(conf, region, scan,
+              getWriterOptions(conf, region.getRegionInfo(), request));
+      done.run(response);
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
+      LOG.error(e);
+    }
+  }
+  private static ExportProtos.ExportResponse processData(final Configuration 
conf,
+      final Region region, final Scan scan, final 
List<SequenceFile.Writer.Option> opts) throws IOException {
+    region.startRegionOperation();
+    try (SequenceFile.Writer out = SequenceFile.createWriter(conf,
+            opts.toArray(new SequenceFile.Writer.Option[opts.size()]));
+      RegionScanner scanner = region.getScanner(scan)) {
+      ImmutableBytesWritable key = new ImmutableBytesWritable();
+      long rowCount = 0;
+      long cellCount = 0;
+      List<Cell> buf = new ArrayList<>();
+      boolean hasMore;
+      do {
+        hasMore = scanner.nextRaw(buf);
+        if (!buf.isEmpty()) {
+          Cell firstCell = buf.get(0);
+          for (Cell cell : buf) {
+            if (Bytes.compareTo(
+              firstCell.getRowArray(), firstCell.getRowOffset(), 
firstCell.getRowLength(),
+              cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 
0) {
+              throw new IOException("Why the RegionScanner#nextRaw returns the 
data of different rows??");
+            }
+          }
+          key.set(firstCell.getRowArray(), firstCell.getRowOffset(), 
firstCell.getRowLength());
+          out.append(key, Result.create(buf));
+          ++rowCount;
+          cellCount += buf.size();
+          buf.clear();
+        }
+      } while (hasMore);
+      return ExportProtos.ExportResponse.newBuilder()
+        .setRowCount(rowCount)
+        .setCellCount(cellCount)
+        .build();
+    } finally {
+      region.closeRegionOperation();
+    }
+  }
+  public static void main(String[] args) throws IOException, Throwable {
+    run(HBaseConfiguration.create(), args);
+  }
+  public static Map<byte[], ExportProtos.ExportResponse> run(final 
Configuration conf,
+          final String[] args) throws IOException, Throwable {
+    String[] otherArgs = new GenericOptionsParser(conf, 
args).getRemainingArgs();
+    if (!Export.checkArguments(otherArgs)) {
+      Export.usage("Wrong number of arguments: " + otherArgs.length);
+      System.exit(-1);
+    }
+    TableName tableName = TableName.valueOf(otherArgs[0]);
+    FileSystem fs = FileSystem.get(conf);
+    String dir = otherArgs[1];
+    checkDir(fs, dir);
+    Scan scan = Export.getConfiguredScanForJob(conf, otherArgs);
+    final ExportProtos.ExportRequest request = 
getConfiguredRequestForJob(conf, otherArgs, scan);
+    try (Connection con = ConnectionFactory.createConnection(conf);
+            Table table = con.getTable(tableName)) {
+      return table.coprocessorService(ExportProtos.ExportService.class,
+            scan.getStartRow(),
+            scan.getStopRow(), new Batch.Call<ExportProtos.ExportService, 
ExportProtos.ExportResponse>() {
+            @Override
+            public ExportProtos.ExportResponse call(ExportProtos.ExportService 
service) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback = 
new BlockingRpcCallback<>();
+              service.export(controller, request, rpcCallback);
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              return rpcCallback.get();
+            }
+        });
+    } catch (Throwable e) {
+      fs.delete(new Path(dir), true);
+      throw e;
+    }
+  }
+  private static void checkDir(final FileSystem fs, final String path) throws 
IOException {
+    Path dir = fs.makeQualified(new Path(path));
+    if (fs.exists(dir)) {
+      throw new RuntimeException("The " + path + " exists");
+    }
+    fs.mkdirs(dir);
+    fs.setPermission(dir, new FsPermission(FsAction.ALL, FsAction.ALL, 
FsAction.ALL));
+  }
+  private static ExportProtos.ExportRequest 
getConfiguredRequestForJob(Configuration conf,
+          String[] args, final Scan scan) throws IOException {
+    String dir = args[1];
+    boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, true);
+    String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
+        SequenceFile.CompressionType.RECORD.toString());
+    String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
+            DefaultCodec.class.getName());
+    LOG.info("compressed=" + compressed
+        + ", compression type=" + compressionType
+        + ", compression codec=" + compressionCodec);
+    return ExportProtos.ExportRequest.newBuilder()
+            .setScan(ProtobufUtil.toScan(scan))
+            .setOutputPath(dir)
+            .setCompressed(compressed)
+            .setCompressCodec(compressionCodec)
+            .setCompressType(compressionType)
+            .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
index 56d229a..4ccbe30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
@@ -86,7 +86,7 @@ public class Export extends Configured implements Tool {
     return job;
   }
 
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] 
args) throws IOException {
+  public static Scan getConfiguredScanForJob(Configuration conf, String[] 
args) throws IOException {
     Scan s = new Scan();
     // Optional arguments.
     // Set Scan Versions
@@ -150,7 +150,7 @@ public class Export extends Configured implements Tool {
   /*
    * @param errorMsg Error message.  Can be null.
    */
-  private static void usage(final String errorMsg) {
+  public static void usage(final String errorMsg) {
     if (errorMsg != null && errorMsg.length() > 0) {
       System.err.println("ERROR: " + errorMsg);
     }
@@ -176,11 +176,12 @@ public class Export extends Configured implements Tool {
     System.err.println("For tables with very wide rows consider setting the 
batch size as below:\n"
         + "   -D" + EXPORT_BATCHING + "=10");
   }
-
-
+  public static boolean checkArguments(final String[] args) {
+    return args.length >= 2;
+  }
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length < 2) {
+    if (!checkArguments(args)) {
       usage("Wrong number of arguments: " + args.length);
       return -1;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c03ea895/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 50146fd..7c74913 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -17,13 +17,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
@@ -34,11 +28,10 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.NavigableMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
@@ -59,6 +52,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ExportEndpoint;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -72,7 +67,10 @@ import 
org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -85,6 +83,12 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests the table import and table export MR job functionality
@@ -106,14 +110,40 @@ public class TestImportExport {
   private static final String EXPORT_BATCH_SIZE = "100";
 
   private static long now = System.currentTimeMillis();
-
+  private static final Exporter EXPORTER_MR = new Exporter() {
+      @Override
+      public boolean runExport(String[] args) throws Throwable {
+        // need to make a copy of the configuration because to make sure 
different temp dirs are used.
+        int status = ToolRunner.run(new 
Configuration(UTIL.getConfiguration()), new Export(), args);
+        return status == 0;
+      }
+      @Override
+      public String toString() {
+          return "MR-based export";
+      }
+  };
+  private static final Exporter EXPORTER_ENDPOINT = new Exporter() {
+    @Override
+      public boolean runExport(String[] args) throws Throwable {
+        ExportEndpoint.run(new Configuration(UTIL.getConfiguration()), args);
+        return true;
+      }
+      @Override
+      public String toString() {
+        return "Endpoint-based export";
+      }
+    };
+  private static final List<Exporter> EXPORTERS = Arrays.asList(EXPORTER_MR, 
EXPORTER_ENDPOINT);
   @BeforeClass
   public static void beforeClass() throws Exception {
     // Up the handlers; this test needs more than usual.
+    
UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+            ExportEndpoint.class.getName());
     
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
 10);
     UTIL.startMiniCluster();
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
     FQ_OUTPUT_DIR =
-      new 
Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
+      new Path(OUTPUT_DIR).makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toString();
   }
 
   @AfterClass
@@ -131,47 +161,108 @@ public class TestImportExport {
 
   @Before
   @After
-  public void cleanup() throws Exception {
+  public void cleanup() throws IOException {
+    deleteOutput();
+  }
+
+  private static void deleteOutput() throws IOException {
     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
     fs.delete(new Path(OUTPUT_DIR), true);
   }
 
   /**
-   * Runs an export job with the specified command line args
+   * Runs an import job with the specified command line args
    * @param args
    * @return true if job completed successfully
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
+   * @throws Exception
    */
-  boolean runExport(String[] args) throws Exception {
+  boolean runImport(String[] args) throws Exception {
     // need to make a copy of the configuration because to make sure different 
temp dirs are used.
-    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), 
new Export(), args);
+    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), 
new Import(), args);
     return status == 0;
   }
-
   /**
-   * Runs an import job with the specified command line args
-   * @param args
-   * @return true if job completed successfully
+   * Test the writer's options.
    * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
    */
-  boolean runImport(String[] args) throws Exception {
-    // need to make a copy of the configuration because to make sure different 
temp dirs are used.
-    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), 
new Import(), args);
-    return status == 0;
+  @Test
+  public void testOutputFileFormat() throws IOException, Throwable {
+    String exportTable = "testOutputFileFormat";
+    for (Exporter exporter : EXPORTERS) {
+      testOutputFileFormat(exportTable, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      deleteOutput();
+    }
+  }
+  /**
+   * Test the writer's options.
+   * @throws IOException
+   */
+  public void testOutputFileFormat(final String exportTable, final Exporter 
exporter) throws IOException, Throwable {
+    String codec = BZip2Codec.class.getName();
+    String type = SequenceFile.CompressionType.RECORD.name();
+    try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 
3);) {
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      t.put(p);
+      p = new Put(ROW2);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      t.put(p);
+      p = new Put(ROW3);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      t.put(p);
+    }
+    //use compress
+    String[] args = new String[] {
+      // Only export row1 & row2.
+      "-D" + FileOutputFormat.COMPRESS + "=true",
+      "-D" + FileOutputFormat.COMPRESS_CODEC + "=" + codec,
+      "-D" + FileOutputFormat.COMPRESS_TYPE + "=" + type,
+      exportTable,
+      FQ_OUTPUT_DIR
+    };
+    assertTrue(exporter.toString(), exporter.runExport(args));
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    List<FileStatus> files = Arrays.asList(fs.listStatus(new 
Path(FQ_OUTPUT_DIR)));
+    assertEquals(exporter.toString(), false, files.isEmpty());
+    Configuration copy = new Configuration(UTIL.getConfiguration());
+    //need to make a copy of the configuration because to make sure the 
Exporter has set the "io.serializations"
+    copy.setStrings("io.serializations", copy.get("io.serializations"),
+            ResultSerialization.class.getName());
+    for (FileStatus file : files) {
+      Path path = file.getPath();
+      //skips the MR meta output
+      if (path.getName().equals("_SUCCESS")) {
+        continue;
+      }
+      try (SequenceFile.Reader reader = new SequenceFile.Reader(
+              copy, SequenceFile.Reader.file(file.getPath()))) {
+        assertEquals(exporter.toString(), 
reader.getCompressionCodec().getClass().getName(), codec);
+        assertEquals(exporter.toString(), reader.getCompressionType().name(), 
type);
+      }
+    }
   }
-
   /**
    * Test simple replication case with column mapping
-   * @throws Exception
+   * @throws IOException
    */
   @Test
-  public void testSimpleCase() throws Exception {
-    String EXPORT_TABLE = "exportSimpleCase";
-    try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 
3);) {
+  public void testSimpleCase() throws IOException, Throwable {
+    String exportTable = "exportSimpleCase";
+    String importTable = "importTableSimpleCase";
+    for (Exporter exporter : EXPORTERS) {
+      testSimpleCase(exportTable, importTable, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      UTIL.deleteTable(TableName.valueOf(importTable));
+      deleteOutput();
+    }
+  }
+  /**
+   * Test simple replication case with column mapping.
+   */
+  public void testSimpleCase(final String exportTable, final String 
importTable,
+          final Exporter exporter) throws IOException, Throwable {
+    try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 
3);) {
       Put p = new Put(ROW1);
       p.addColumn(FAMILYA, QUAL, now, QUAL);
       p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
@@ -189,53 +280,53 @@ public class TestImportExport {
       t.put(p);
     }
 
-      String[] args = new String[] {
-          // Only export row1 & row2.
-          "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
-          "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
-          EXPORT_TABLE,
-          FQ_OUTPUT_DIR,
-          "1000", // max number of key versions per key to export
+    String[] args = new String[] {
+      // Only export row1 & row2.
+      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
+      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
+      exportTable,
+      FQ_OUTPUT_DIR,
+      "1000", // max number of key versions per key to export
+    };
+    assertTrue(exporter.toString(), exporter.runExport(args));
+
+    try (Table t = UTIL.createTable(TableName.valueOf(importTable), FAMILYB, 
3);) {
+      args = new String[] {
+          "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
+          importTable,
+          FQ_OUTPUT_DIR
       };
-      assertTrue(runExport(args));
-
-      String IMPORT_TABLE = "importTableSimpleCase";
-      try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), 
FAMILYB, 3);) {
-        args = new String[] {
-            "-D" + Import.CF_RENAME_PROP + 
"="+FAMILYA_STRING+":"+FAMILYB_STRING,
-            IMPORT_TABLE,
-            FQ_OUTPUT_DIR
-        };
-        assertTrue(runImport(args));
-
-        Get g = new Get(ROW1);
-        g.setMaxVersions();
-        Result r = t.get(g);
-        assertEquals(3, r.size());
-        g = new Get(ROW2);
-        g.setMaxVersions();
-        r = t.get(g);
-        assertEquals(3, r.size());
-        g = new Get(ROW3);
-        r = t.get(g);
-        assertEquals(0, r.size());
-      }
+      assertTrue(exporter.toString(), runImport(args));
+
+      Get g = new Get(ROW1);
+      g.setMaxVersions();
+      Result r = t.get(g);
+      assertEquals(exporter.toString(), 3, r.size());
+      g = new Get(ROW2);
+      g.setMaxVersions();
+      r = t.get(g);
+      assertEquals(exporter.toString(), 3, r.size());
+      g = new Get(ROW3);
+      r = t.get(g);
+      assertEquals(exporter.toString(), 0, r.size());
+    }
   }
-
   /**
    * Test export hbase:meta table
    *
-   * @throws Exception
+   * @throws IOException
    */
   @Test
-  public void testMetaExport() throws Exception {
-    String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
-    String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" 
};
-    assertTrue(runExport(args));
+  public void testMetaExport() throws IOException, Throwable {
+    String exportTable = TableName.META_TABLE_NAME.getNameAsString();
+    String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1", "0", "0" };
+    for (Exporter exporter : EXPORTERS) {
+      assertTrue(exporter.toString(), exporter.runExport(args));
+      deleteOutput();
+    }
   }
-
   /**
-   * Test import data from 0.94 exported file
+   * Test import data from 0.94 exported file.
    * @throws Exception
    */
   @Test
@@ -252,11 +343,11 @@ public class TestImportExport {
     Path importPath = new Path(f.toURI());
     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + 
name));
-    String IMPORT_TABLE = name;
-    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), 
Bytes.toBytes("f1"), 3);) {
+    String importTable = name;
+    try (Table t = UTIL.createTable(TableName.valueOf(importTable), 
Bytes.toBytes("f1"), 3);) {
       String[] args = new String[] {
           "-Dhbase.import.version=0.94" ,
-          IMPORT_TABLE, FQ_OUTPUT_DIR
+          importTable, FQ_OUTPUT_DIR
       };
       assertTrue(runImport(args));
       /* exportedTableIn94Format contains 5 rows
@@ -270,18 +361,28 @@ public class TestImportExport {
      assertEquals(5, UTIL.countRows(t));
     }
   }
-
   /**
    * Test export scanner batching
+     * @throws java.lang.IOException
    */
    @Test
-   public void testExportScannerBatching() throws Exception {
-    String BATCH_TABLE = "exportWithBatch";
-    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(BATCH_TABLE));
+   public void testExportScannerBatching() throws IOException, Throwable {
+    String exportTable = "exportWithBatch";
+      for (Exporter exporter : EXPORTERS) {
+        testExportScannerBatching(exportTable, exporter);
+        UTIL.deleteTable(TableName.valueOf(exportTable));
+        deleteOutput();
+      }
+    }
+  /**
+   * Test export scanner batching.
+   */
+   public void testExportScannerBatching(final String exportTable, final 
Exporter exporter) throws IOException, Throwable {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(exportTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA)
         .setMaxVersions(1)
     );
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
     try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
 
       Put p = new Put(ROW1);
@@ -294,25 +395,34 @@ public class TestImportExport {
 
       String[] args = new String[] {
           "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added 
scanner batching arg.
-          BATCH_TABLE,
+          exportTable,
           FQ_OUTPUT_DIR
       };
-      assertTrue(runExport(args));
+      assertTrue(exporter.toString(), exporter.runExport(args));
 
       FileSystem fs = FileSystem.get(UTIL.getConfiguration());
       fs.delete(new Path(FQ_OUTPUT_DIR), true);
     }
   }
-
   @Test
-  public void testWithDeletes() throws Exception {
-    String EXPORT_TABLE = "exportWithDeletes";
-    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
+  public void testWithDeletes() throws IOException, Throwable {
+      String exportTable = "exportWithDeletes";
+      String importTable = "importWithDeletes";
+    for (Exporter exporter : EXPORTERS) {
+      testWithDeletes(exportTable, importTable, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      UTIL.deleteTable(TableName.valueOf(importTable));
+      deleteOutput();
+    }
+  }
+  public void testWithDeletes(final String exportTable, final String 
importTable,
+          final Exporter exporter) throws IOException, Throwable {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(exportTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA)
         .setMaxVersions(5)
         .setKeepDeletedCells(KeepDeletedCells.TRUE)
     );
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
     try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
 
       Put p = new Put(ROW1);
@@ -332,25 +442,23 @@ public class TestImportExport {
 
     String[] args = new String[] {
         "-D" + Export.RAW_SCAN + "=true",
-        EXPORT_TABLE,
+        exportTable,
         FQ_OUTPUT_DIR,
         "1000", // max number of key versions per key to export
     };
-    assertTrue(runExport(args));
-
-    String IMPORT_TABLE = "importWithDeletes";
-    desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
+    assertTrue(exporter.toString(), exporter.runExport(args));
+    desc = new HTableDescriptor(TableName.valueOf(importTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA)
         .setMaxVersions(5)
         .setKeepDeletedCells(KeepDeletedCells.TRUE)
     );
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
     try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
       args = new String[] {
-          IMPORT_TABLE,
-          FQ_OUTPUT_DIR
+        importTable,
+        FQ_OUTPUT_DIR
       };
-      assertTrue(runImport(args));
+      assertTrue(exporter.toString(), runImport(args));
 
       Scan s = new Scan();
       s.setMaxVersions();
@@ -358,29 +466,36 @@ public class TestImportExport {
       ResultScanner scanner = t.getScanner(s);
       Result r = scanner.next();
       Cell[] res = r.rawCells();
-      assertTrue(CellUtil.isDeleteFamily(res[0]));
-      assertEquals(now+4, res[1].getTimestamp());
-      assertEquals(now+3, res[2].getTimestamp());
-      assertTrue(CellUtil.isDelete(res[3]));
-      assertEquals(now+2, res[4].getTimestamp());
-      assertEquals(now+1, res[5].getTimestamp());
-      assertEquals(now, res[6].getTimestamp());
+      assertTrue(exporter.toString(), CellUtil.isDeleteFamily(res[0]));
+      assertEquals(exporter.toString(), now+4, res[1].getTimestamp());
+      assertEquals(exporter.toString(), now+3, res[2].getTimestamp());
+      assertTrue(exporter.toString(), CellUtil.isDelete(res[3]));
+      assertEquals(exporter.toString(), now+2, res[4].getTimestamp());
+      assertEquals(exporter.toString(), now+1, res[5].getTimestamp());
+      assertEquals(exporter.toString(), now, res[6].getTimestamp());
     }
   }
-
-
   @Test
-  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws 
Exception {
-    TableName EXPORT_TABLE =
-        
TableName.valueOf("exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily");
-    HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
+  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws 
IOException, Throwable {
+      String exportTable = 
"exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
+      String importTable = 
"importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
+    for (Exporter exporter : EXPORTERS) {
+      testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(exportTable, 
importTable, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      UTIL.deleteTable(TableName.valueOf(importTable));
+      deleteOutput();
+    }
+  }
+  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(final 
String exportTable, final String importTable,
+          final Exporter exporter) throws IOException, Throwable {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(exportTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA)
         .setMaxVersions(5)
         .setKeepDeletedCells(KeepDeletedCells.TRUE)
     );
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
 
-    Table exportT = UTIL.getConnection().getTable(EXPORT_TABLE);
+    Table exportT = UTIL.getConnection().getTable(desc.getTableName());
 
     //Add first version of QUAL
     Put p = new Put(ROW1);
@@ -402,26 +517,24 @@ public class TestImportExport {
 
 
     String[] args = new String[] {
-        "-D" + Export.RAW_SCAN + "=true", EXPORT_TABLE.getNameAsString(),
+        "-D" + Export.RAW_SCAN + "=true", exportTable,
         FQ_OUTPUT_DIR,
         "1000", // max number of key versions per key to export
     };
-    assertTrue(runExport(args));
-
-    String IMPORT_TABLE = 
"importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
-    desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
+    assertTrue(exporter.toString(), exporter.runExport(args));
+    desc = new HTableDescriptor(TableName.valueOf(importTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA)
         .setMaxVersions(5)
         .setKeepDeletedCells(KeepDeletedCells.TRUE)
     );
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
 
-    Table importT = 
UTIL.getConnection().getTable(TableName.valueOf(IMPORT_TABLE));
+    Table importT = 
UTIL.getConnection().getTable(TableName.valueOf(importTable));
     args = new String[] {
-        IMPORT_TABLE,
+        importTable,
         FQ_OUTPUT_DIR
     };
-    assertTrue(runImport(args));
+    assertTrue(exporter.toString(), runImport(args));
 
     Scan s = new Scan();
     s.setMaxVersions();
@@ -434,26 +547,39 @@ public class TestImportExport {
     Result  exportedTResult =  exportedTScanner.next();
     try {
       Result.compareResults(exportedTResult, importedTResult);
-    } catch (Exception e) {
+    } catch (IOException e) {
       fail("Original and imported tables data comparision failed with 
error:"+e.getMessage());
     } finally {
       exportT.close();
       importT.close();
     }
   }
-
   /**
    * Create a simple table, run an Export Job on it, Import with filtering on, 
 verify counts,
    * attempt with invalid values.
    */
   @Test
-  public void testWithFilter() throws Exception {
+  public void testWithFilter() throws IOException, Throwable {
+      String exportTable = "exportSimpleCase_ImportWithFilter";
+      String importTable = "importWithFilter";
+    for (Exporter exporter : EXPORTERS) {
+      testWithFilter(exportTable, importTable, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      UTIL.deleteTable(TableName.valueOf(importTable));
+      deleteOutput();
+    }
+  }
+  /**
+   * Create a simple table, run an Export Job on it, Import with filtering on, 
 verify counts,
+   * attempt with invalid values.
+   */
+  public void testWithFilter(final String exportTable, final String 
importTable,
+          final Exporter exporter) throws IOException, Throwable {
     // Create simple table to export
-    String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
-    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(exportTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
-    UTIL.getHBaseAdmin().createTable(desc);
-    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
+    UTIL.getAdmin().createTable(desc);
+    Table exportT = UTIL.getConnection().getTable(desc.getTableName());
 
     Put p1 = new Put(ROW1);
     p1.addColumn(FAMILYA, QUAL, now, QUAL);
@@ -466,43 +592,42 @@ public class TestImportExport {
     Put p2 = new Put(ROW2);
     p2.addColumn(FAMILYA, QUAL, now, QUAL);
 
-    exportTable.put(Arrays.asList(p1, p2));
+    exportT.put(Arrays.asList(p1, p2));
 
     // Export the simple table
-    String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
-    assertTrue(runExport(args));
+    String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000" };
+    assertTrue(exporter.toString(), exporter.runExport(args));
 
     // Import to a new table
-    String IMPORT_TABLE = "importWithFilter";
-    desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
+    desc = new HTableDescriptor(TableName.valueOf(importTable));
     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
-    UTIL.getHBaseAdmin().createTable(desc);
+    UTIL.getAdmin().createTable(desc);
 
-    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
+    Table importT = UTIL.getConnection().getTable(desc.getTableName());
     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + 
PrefixFilter.class.getName(),
-        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), 
IMPORT_TABLE,
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), 
importTable,
         FQ_OUTPUT_DIR,
         "1000" };
-    assertTrue(runImport(args));
+    assertTrue(exporter.toString(), runImport(args));
 
     // get the count of the source table for that time range
     PrefixFilter filter = new PrefixFilter(ROW1);
-    int count = getCount(exportTable, filter);
+    int count = getCount(exportT, filter);
 
-    Assert.assertEquals("Unexpected row count between export and import 
tables", count,
-      getCount(importTable, null));
+    Assert.assertEquals("Unexpected row count between export(" + 
exporter.toString() + ") and import tables", count,
+      getCount(importT, null));
 
     // and then test that a broken command doesn't bork everything - easier 
here because we don't
     // need to re-run the export job
 
     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + 
Filter.class.getName(),
-        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", 
EXPORT_TABLE,
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", 
exportTable,
         FQ_OUTPUT_DIR, "1000" };
     assertFalse(runImport(args));
 
     // cleanup
-    exportTable.close();
-    importTable.close();
+    exportT.close();
+    importT.close();
   }
 
   /**
@@ -637,40 +762,50 @@ public class TestImportExport {
 
     Import.addFilterAndArguments(configuration, FilterBase.class, args);
     assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
-        configuration.get(Import.FILTER_CLASS_CONF_KEY));
+      configuration.get(Import.FILTER_CLASS_CONF_KEY));
     assertEquals("param1,param2", 
configuration.get(Import.FILTER_ARGS_CONF_KEY));
   }
-
   @Test
-  public void testDurability() throws Exception {
+  public void testDurability() throws IOException, Throwable {
+      String exportTable = "exporttestDurability";
+      String importTable = "importTestDurability1";
+      String importTableV2 = "importTestDurability2";
+    for (Exporter exporter : EXPORTERS) {
+      testDurability(exportTable, importTable, importTableV2, exporter);
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      UTIL.deleteTable(TableName.valueOf(importTable));
+      UTIL.deleteTable(TableName.valueOf(importTableV2));
+      deleteOutput();
+    }
+  }
+  public void testDurability(final String exportTable, final String 
importTable, final String importTable2,
+          final Exporter exporter) throws IOException, Throwable {
     // Create an export table.
-    String exportTableName = "exporttestDurability";
-    try (Table exportTable = 
UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
+    try (Table exportT = UTIL.createTable(TableName.valueOf(exportTable), 
FAMILYA, 3);) {
 
       // Insert some data
       Put put = new Put(ROW1);
       put.addColumn(FAMILYA, QUAL, now, QUAL);
       put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
       put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
-      exportTable.put(put);
+      exportT.put(put);
 
       put = new Put(ROW2);
       put.addColumn(FAMILYA, QUAL, now, QUAL);
       put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
       put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
-      exportTable.put(put);
+      exportT.put(put);
 
       // Run the export
-      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
-      assertTrue(runExport(args));
+      String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000"};
+      assertTrue(exporter.toString(), exporter.runExport(args));
 
       // Create the table for import
-      String importTableName = "importTestDurability1";
-      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), 
FAMILYA, 3);
+      Table importT = UTIL.createTable(TableName.valueOf(importTable), 
FAMILYA, 3);
 
       // Register the wal listener for the import table
       HRegionInfo region = 
UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
-          .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+          .getOnlineRegions(importT.getName()).get(0).getRegionInfo();
       TableWALActionListener walListener = new TableWALActionListener(region);
       WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
       wal.registerWALActionsListener(walListener);
@@ -678,27 +813,26 @@ public class TestImportExport {
       // Run the import with SKIP_WAL
       args =
           new String[] { "-D" + Import.WAL_DURABILITY + "=" + 
Durability.SKIP_WAL.name(),
-              importTableName, FQ_OUTPUT_DIR };
-      assertTrue(runImport(args));
+              importTable, FQ_OUTPUT_DIR };
+      assertTrue(exporter.toString(), runImport(args));
       //Assert that the wal is not visisted
-      assertTrue(!walListener.isWALVisited());
+      assertTrue(exporter.toString(), !walListener.isWALVisited());
       //Ensure that the count is 2 (only one version of key value is obtained)
-      assertTrue(getCount(importTable, null) == 2);
+      assertTrue(exporter.toString(), getCount(importT, null) == 2);
 
       // Run the import with the default durability option
-      importTableName = "importTestDurability2";
-      importTable = UTIL.createTable(TableName.valueOf(importTableName), 
FAMILYA, 3);
+      importT = UTIL.createTable(TableName.valueOf(importTable2), FAMILYA, 3);
       region = 
UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
-          .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+          .getOnlineRegions(importT.getName()).get(0).getRegionInfo();
       wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
       walListener = new TableWALActionListener(region);
       wal.registerWALActionsListener(walListener);
-      args = new String[] { importTableName, FQ_OUTPUT_DIR };
-      assertTrue(runImport(args));
+      args = new String[] { importTable2, FQ_OUTPUT_DIR };
+      assertTrue(exporter.toString(), runImport(args));
       //Assert that the wal is visisted
-      assertTrue(walListener.isWALVisited());
+      assertTrue(exporter.toString(), walListener.isWALVisited());
       //Ensure that the count is 2 (only one version of key value is obtained)
-      assertTrue(getCount(importTable, null) == 2);
+      assertTrue(exporter.toString(), getCount(importT, null) == 2);
     }
   }
 
@@ -727,4 +861,7 @@ public class TestImportExport {
       return isVisited;
     }
   }
+  public interface Exporter {
+    boolean runExport(final String[] args) throws Throwable;
+  }
 }

Reply via email to