http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java deleted file mode 100644 index 1d4d37b..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type. - */ -@InterfaceAudience.Public -public class CellCreator { - - public static final String VISIBILITY_EXP_RESOLVER_CLASS = - "hbase.mapreduce.visibility.expression.resolver.class"; - - private VisibilityExpressionResolver visExpResolver; - - public CellCreator(Configuration conf) { - Class<? extends VisibilityExpressionResolver> clazz = conf.getClass( - VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class, - VisibilityExpressionResolver.class); - this.visExpResolver = ReflectionUtils.newInstance(clazz, conf); - this.visExpResolver.init(); - } - - /** - * @param row row key - * @param roffset row offset - * @param rlength row length - * @param family family name - * @param foffset family offset - * @param flength family length - * @param qualifier column qualifier - * @param qoffset qualifier offset - * @param qlength qualifier length - * @param timestamp version timestamp - * @param value column value - * @param voffset value offset - * @param vlength value length - * @return created Cell - * @throws IOException - */ - public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, - byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, - int vlength) throws IOException { - return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - timestamp, value, voffset, vlength, (List<Tag>)null); - } - - /** - * @param row row key - * @param roffset row offset - * @param rlength row length - * @param family family name - * @param foffset family offset - * @param flength family length - * @param qualifier column qualifier - * @param qoffset qualifier offset - * @param qlength qualifier length - * @param timestamp version timestamp - * @param value column value - * @param voffset value offset - * @param vlength value length - * @param visExpression visibility expression to be associated with cell - * @return created Cell - * @throws IOException - */ - @Deprecated - public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, - byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, - int vlength, String visExpression) throws IOException { - List<Tag> visTags = null; - if (visExpression != null) { - visTags = this.visExpResolver.createVisibilityExpTags(visExpression); - } - return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, - qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags); - } - - /** - * @param row row key - * @param roffset row offset - * @param rlength row length - * @param family family name - * @param foffset family offset - * @param flength family length - * @param qualifier column qualifier - * @param qoffset qualifier offset - * @param qlength qualifier length - * @param timestamp version timestamp - * @param value column value - * @param voffset value offset - * @param vlength value length - * @param tags - * @return created Cell - * @throws IOException - */ - public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, - byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, - int vlength, List<Tag> tags) throws IOException { - return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, - qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); - } - - /** - * @return Visibility expression resolver - */ - public VisibilityExpressionResolver getVisibilityExpressionResolver() { - return this.visExpResolver; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java deleted file mode 100644 index 21b8556..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ /dev/null @@ -1,386 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * Tool used to copy a table to another one which can be on a different setup. - * It is also configurable with a start and time as well as a specification - * of the region server implementation if different from the local cluster. - */ -@InterfaceAudience.Public -public class CopyTable extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(CopyTable.class); - - final static String NAME = "copytable"; - long startTime = 0; - long endTime = HConstants.LATEST_TIMESTAMP; - int batch = Integer.MAX_VALUE; - int cacheRow = -1; - int versions = -1; - String tableName = null; - String startRow = null; - String stopRow = null; - String dstTableName = null; - String peerAddress = null; - String families = null; - boolean allCells = false; - static boolean shuffle = false; - - boolean bulkload = false; - Path bulkloadDir = null; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - /** - * Sets up the actual job. - * - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public Job createSubmittableJob(String[] args) - throws IOException { - if (!doCommandLine(args)) { - return null; - } - - Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJarByClass(CopyTable.class); - Scan scan = new Scan(); - - scan.setBatch(batch); - scan.setCacheBlocks(false); - - if (cacheRow > 0) { - scan.setCaching(cacheRow); - } else { - scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); - } - - scan.setTimeRange(startTime, endTime); - - if (allCells) { - scan.setRaw(true); - } - if (shuffle) { - job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); - } - if (versions >= 0) { - scan.setMaxVersions(versions); - } - - if (startRow != null) { - scan.setStartRow(Bytes.toBytesBinary(startRow)); - } - - if (stopRow != null) { - scan.setStopRow(Bytes.toBytesBinary(stopRow)); - } - - if(families != null) { - String[] fams = families.split(","); - Map<String,String> cfRenameMap = new HashMap<>(); - for(String fam : fams) { - String sourceCf; - if(fam.contains(":")) { - // fam looks like "sourceCfName:destCfName" - String[] srcAndDest = fam.split(":", 2); - sourceCf = srcAndDest[0]; - String destCf = srcAndDest[1]; - cfRenameMap.put(sourceCf, destCf); - } else { - // fam is just "sourceCf" - sourceCf = fam; - } - scan.addFamily(Bytes.toBytes(sourceCf)); - } - Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); - } - job.setNumReduceTasks(0); - - if (bulkload) { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, - null, job); - - // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. - TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); - - FileSystem fs = FileSystem.get(getConf()); - Random rand = new Random(); - Path root = new Path(fs.getWorkingDirectory(), "copytable"); - fs.mkdirs(root); - while (true) { - bulkloadDir = new Path(root, "" + rand.nextLong()); - if (!fs.exists(bulkloadDir)) { - break; - } - } - - System.out.println("HFiles will be stored at " + this.bulkloadDir); - HFileOutputFormat2.setOutputPath(job, bulkloadDir); - try (Connection conn = ConnectionFactory.createConnection(getConf()); - Admin admin = conn.getAdmin()) { - HFileOutputFormat2.configureIncrementalLoadMap(job, - admin.listTableDescriptor((TableName.valueOf(dstTableName)))); - } - } else { - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Import.Importer.class, null, null, job); - - TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, - null); - } - - return job; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void printUsage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + - "[--new.name=NEW] [--peer.adr=ADR] <tablename>"); - System.err.println(); - System.err.println("Options:"); - System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); - System.err.println(" specify if different from current cluster"); - System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); - System.err.println(" startrow the start row"); - System.err.println(" stoprow the stop row"); - System.err.println(" starttime beginning of the time range (unixtime in millis)"); - System.err.println(" without endtime means from starttime to forever"); - System.err.println(" endtime end of the time range. Ignored if no starttime specified."); - System.err.println(" versions number of cell versions to copy"); - System.err.println(" new.name new table's name"); - System.err.println(" peer.adr Address of the peer cluster given in the format"); - System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" - + ".port:zookeeper.znode.parent"); - System.err.println(" families comma-separated list of families to copy"); - System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); - System.err.println(" To keep the same name, just give \"cfName\""); - System.err.println(" all.cells also copy delete markers and deleted cells"); - System.err.println(" bulkload Write input into HFiles and bulk load to the destination " - + "table"); - System.err.println(); - System.err.println("Args:"); - System.err.println(" tablename Name of the table to copy"); - System.err.println(); - System.err.println("Examples:"); - System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); - System.err.println(" $ hbase " + - "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + - "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); - System.err.println("For performance consider the following general option:\n" - + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" - + " decreases the round trip time to the server and may increase performance.\n" - + " -Dhbase.client.scanner.caching=100\n" - + " The following should always be set to false, to prevent writing data twice, which may produce \n" - + " inaccurate results.\n" - + " -Dmapreduce.map.speculative=false"); - } - - private boolean doCommandLine(final String[] args) { - // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). - if (args.length < 1) { - printUsage(null); - return false; - } - try { - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(null); - return false; - } - - final String startRowArgKey = "--startrow="; - if (cmd.startsWith(startRowArgKey)) { - startRow = cmd.substring(startRowArgKey.length()); - continue; - } - - final String stopRowArgKey = "--stoprow="; - if (cmd.startsWith(stopRowArgKey)) { - stopRow = cmd.substring(stopRowArgKey.length()); - continue; - } - - final String startTimeArgKey = "--starttime="; - if (cmd.startsWith(startTimeArgKey)) { - startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); - continue; - } - - final String endTimeArgKey = "--endtime="; - if (cmd.startsWith(endTimeArgKey)) { - endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); - continue; - } - - final String batchArgKey = "--batch="; - if (cmd.startsWith(batchArgKey)) { - batch = Integer.parseInt(cmd.substring(batchArgKey.length())); - continue; - } - - final String cacheRowArgKey = "--cacheRow="; - if (cmd.startsWith(cacheRowArgKey)) { - cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); - continue; - } - - final String versionsArgKey = "--versions="; - if (cmd.startsWith(versionsArgKey)) { - versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); - continue; - } - - final String newNameArgKey = "--new.name="; - if (cmd.startsWith(newNameArgKey)) { - dstTableName = cmd.substring(newNameArgKey.length()); - continue; - } - - final String peerAdrArgKey = "--peer.adr="; - if (cmd.startsWith(peerAdrArgKey)) { - peerAddress = cmd.substring(peerAdrArgKey.length()); - continue; - } - - final String familiesArgKey = "--families="; - if (cmd.startsWith(familiesArgKey)) { - families = cmd.substring(familiesArgKey.length()); - continue; - } - - if (cmd.startsWith("--all.cells")) { - allCells = true; - continue; - } - - if (cmd.startsWith("--bulkload")) { - bulkload = true; - continue; - } - - if (cmd.startsWith("--shuffle")) { - shuffle = true; - continue; - } - - if (i == args.length-1) { - tableName = cmd; - } else { - printUsage("Invalid argument '" + cmd + "'"); - return false; - } - } - if (dstTableName == null && peerAddress == null) { - printUsage("At least a new table name or a " + - "peer address must be specified"); - return false; - } - if ((endTime != 0) && (startTime > endTime)) { - printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); - return false; - } - - if (bulkload && peerAddress != null) { - printUsage("Remote bulkload is not supported!"); - return false; - } - - // set dstTableName if necessary - if (dstTableName == null) { - dstTableName = tableName; - } - } catch (Exception e) { - e.printStackTrace(); - printUsage("Can't start because " + e.getMessage()); - return false; - } - return true; - } - - /** - * Main entry point. - * - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - Job job = createSubmittableJob(args); - if (job == null) return 1; - if (!job.waitForCompletion(true)) { - LOG.info("Map-reduce job failed!"); - if (bulkload) { - LOG.info("Files are not bulkloaded!"); - } - return 1; - } - int code = 0; - if (bulkload) { - code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(), - this.dstTableName}); - if (code == 0) { - // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun - // LoadIncrementalHFiles. - FileSystem fs = FileSystem.get(this.getConf()); - if (!fs.delete(this.bulkloadDir, true)) { - LOG.error("Deleting folder " + bulkloadDir + " failed!"); - code = 1; - } - } - } - return code; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java deleted file mode 100644 index 004ee5c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY; -import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; -import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.security.visibility.Authorizations; -import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; -import org.apache.hadoop.hbase.security.visibility.VisibilityLabelOrdinalProvider; -import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This implementation creates tags by expanding expression using label ordinal. Labels will be - * serialized in sorted order of it's ordinal. - */ -@InterfaceAudience.Private -public class DefaultVisibilityExpressionResolver implements VisibilityExpressionResolver { - private static final Log LOG = LogFactory.getLog(DefaultVisibilityExpressionResolver.class); - - private Configuration conf; - private final Map<String, Integer> labels = new HashMap<>(); - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public void init() { - // Reading all the labels and ordinal. - // This scan should be done by user with global_admin privileges.. Ensure that it works - Table labelsTable = null; - Connection connection = null; - try { - connection = ConnectionFactory.createConnection(conf); - try { - labelsTable = connection.getTable(LABELS_TABLE_NAME); - } catch (IOException e) { - LOG.error("Error opening 'labels' table", e); - return; - } - Scan scan = new Scan(); - scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL)); - scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); - ResultScanner scanner = null; - try { - scanner = labelsTable.getScanner(scan); - Result next = null; - while ((next = scanner.next()) != null) { - byte[] row = next.getRow(); - byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); - labels.put(Bytes.toString(value), Bytes.toInt(row)); - } - } catch (TableNotFoundException e) { - // Table not found. So just return - return; - } catch (IOException e) { - LOG.error("Error scanning 'labels' table", e); - } finally { - if (scanner != null) scanner.close(); - } - } catch (IOException ioe) { - LOG.error("Failed reading 'labels' tags", ioe); - return; - } finally { - if (labelsTable != null) { - try { - labelsTable.close(); - } catch (IOException ioe) { - LOG.warn("Error closing 'labels' table", ioe); - } - } - if (connection != null) - try { - connection.close(); - } catch (IOException ioe) { - LOG.warn("Failed close of temporary connection", ioe); - } - } - } - - @Override - public List<Tag> createVisibilityExpTags(String visExpression) throws IOException { - VisibilityLabelOrdinalProvider provider = new VisibilityLabelOrdinalProvider() { - @Override - public int getLabelOrdinal(String label) { - Integer ordinal = null; - ordinal = labels.get(label); - if (ordinal != null) { - return ordinal.intValue(); - } - return VisibilityConstants.NON_EXIST_LABEL_ORDINAL; - } - - @Override - public String getLabel(int ordinal) { - // Unused - throw new UnsupportedOperationException( - "getLabel should not be used in VisibilityExpressionResolver"); - } - }; - return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java deleted file mode 100644 index 9737b55..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; -import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.util.ProgramDriver; - -/** - * Driver for hbase mapreduce jobs. Select which to run by passing - * name of job to this main. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -@InterfaceStability.Stable -public class Driver { - /** - * @param args - * @throws Throwable - */ - public static void main(String[] args) throws Throwable { - ProgramDriver pgd = new ProgramDriver(); - - pgd.addClass(RowCounter.NAME, RowCounter.class, - "Count rows in HBase table."); - pgd.addClass(CellCounter.NAME, CellCounter.class, - "Count cells in HBase table."); - pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); - pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); - pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); - pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, - "Complete a bulk data load."); - pgd.addClass(CopyTable.NAME, CopyTable.class, - "Export a table from local cluster to peer cluster."); - pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" + - " the data from tables in two different clusters. WARNING: It" + - " doesn't work for incrementColumnValues'd cells since the" + - " timestamp is changed after being appended to the log."); - pgd.addClass(WALPlayer.NAME, WALPlayer.class, "Replay WAL files."); - pgd.addClass(ExportSnapshot.NAME, ExportSnapshot.class, "Export" + - " the specific snapshot to a given FileSystem."); - - ProgramDriver.class.getMethod("driver", new Class [] {String[].class}). - invoke(pgd, new Object[]{args}); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java deleted file mode 100644 index 4c01528..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ /dev/null @@ -1,197 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.IncompatibleFilterException; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.RegexStringComparator; -import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** -* Export an HBase table. -* Writes content to sequence files up in HDFS. Use {@link Import} to read it -* back in again. -*/ -@InterfaceAudience.Public -public class Export extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Export.class); - final static String NAME = "export"; - final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; - final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - /** - * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - String tableName = args[0]; - Path outputDir = new Path(args[1]); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJobName(NAME + "_" + tableName); - job.setJarByClass(Export.class); - // Set optional scan parameters - Scan s = getConfiguredScanForJob(conf, args); - IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); - // No reducers. Just write straight to output files. - job.setNumReduceTasks(0); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Result.class); - FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. - return job; - } - - private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { - Scan s = new Scan(); - // Optional arguments. - // Set Scan Versions - int versions = args.length > 2? Integer.parseInt(args[2]): 1; - s.setMaxVersions(versions); - // Set Scan Range - long startTime = args.length > 3? Long.parseLong(args[3]): 0L; - long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; - s.setTimeRange(startTime, endTime); - // Set cache blocks - s.setCacheBlocks(false); - // set Start and Stop row - if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { - s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); - } - if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { - s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); - } - // Set Scan Column Family - boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); - if (raw) { - s.setRaw(raw); - } - for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) { - s.addFamily(Bytes.toBytes(columnFamily)); - } - // Set RowFilter or Prefix Filter if applicable. - Filter exportFilter = getExportFilter(args); - if (exportFilter!= null) { - LOG.info("Setting Scan Filter for Export."); - s.setFilter(exportFilter); - } - - int batching = conf.getInt(EXPORT_BATCHING, -1); - if (batching != -1){ - try { - s.setBatch(batching); - } catch (IncompatibleFilterException e) { - LOG.error("Batching could not be set", e); - } - } - LOG.info("versions=" + versions + ", starttime=" + startTime + - ", endtime=" + endTime + ", keepDeletedCells=" + raw); - return s; - } - - private static Filter getExportFilter(String[] args) { - Filter exportFilter = null; - String filterCriteria = (args.length > 5) ? args[5]: null; - if (filterCriteria == null) return null; - if (filterCriteria.startsWith("^")) { - String regexPattern = filterCriteria.substring(1, filterCriteria.length()); - exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); - } else { - exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); - } - return exportFilter; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + - "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n"); - System.err.println(" Note: -D properties will be applied to the conf used. "); - System.err.println(" For example: "); - System.err.println(" -D mapreduce.output.fileoutputformat.compress=true"); - System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec"); - System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK"); - System.err.println(" Additionally, the following SCAN properties can be specified"); - System.err.println(" to control/limit what is exported.."); - System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ..."); - System.err.println(" -D " + RAW_SCAN + "=true"); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>"); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>"); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the export"); - System.err.println("For performance consider the following properties:\n" - + " -Dhbase.client.scanner.caching=100\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false"); - System.err.println("For tables with very wide rows consider setting the batch size as below:\n" - + " -D" + EXPORT_BATCHING + "=10"); - } - - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - return -1; - } - Job job = createSubmittableJob(getConf(), args); - return (job.waitForCompletion(true) ? 0 : 1); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args); - System.exit(errCode); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java deleted file mode 100644 index dc30c6e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; - -/** - * Extract grouping columns from input record. - */ -@InterfaceAudience.Public -public class GroupingTableMapper -extends TableMapper<ImmutableBytesWritable,Result> implements Configurable { - - /** - * JobConf parameter to specify the columns used to produce the key passed to - * collect from the map phase. - */ - public static final String GROUP_COLUMNS = - "hbase.mapred.groupingtablemap.columns"; - - /** The grouping columns. */ - protected byte [][] columns; - /** The current configuration. */ - private Configuration conf = null; - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table to be processed. - * @param scan The scan with the columns etc. - * @param groupColumns A space separated list of columns used to form the - * key used in collect. - * @param mapper The mapper class. - * @param job The current job. - * @throws IOException When setting up the job fails. - */ - @SuppressWarnings("unchecked") - public static void initJob(String table, Scan scan, String groupColumns, - Class<? extends TableMapper> mapper, Job job) throws IOException { - TableMapReduceUtil.initTableMapperJob(table, scan, mapper, - ImmutableBytesWritable.class, Result.class, job); - job.getConfiguration().set(GROUP_COLUMNS, groupColumns); - } - - /** - * Extract the grouping columns from value to construct a new key. Pass the - * new key and value to reduce. If any of the grouping columns are not found - * in the value, the record is skipped. - * - * @param key The current key. - * @param value The current value. - * @param context The current context. - * @throws IOException When writing the record fails. - * @throws InterruptedException When the job is aborted. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - byte[][] keyVals = extractKeyValues(value); - if(keyVals != null) { - ImmutableBytesWritable tKey = createGroupKey(keyVals); - context.write(tKey, value); - } - } - - /** - * Extract columns values from the current record. This method returns - * null if any of the columns are not found. - * <p> - * Override this method if you want to deal with nulls differently. - * - * @param r The current values. - * @return Array of byte values. - */ - protected byte[][] extractKeyValues(Result r) { - byte[][] keyVals = null; - ArrayList<byte[]> foundList = new ArrayList<>(); - int numCols = columns.length; - if (numCols > 0) { - for (Cell value: r.listCells()) { - byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value), - CellUtil.cloneQualifier(value)); - for (int i = 0; i < numCols; i++) { - if (Bytes.equals(column, columns[i])) { - foundList.add(CellUtil.cloneValue(value)); - break; - } - } - } - if(foundList.size() == numCols) { - keyVals = foundList.toArray(new byte[numCols][]); - } - } - return keyVals; - } - - /** - * Create a key by concatenating multiple column values. - * <p> - * Override this function in order to produce different types of keys. - * - * @param vals The current key/values. - * @return A key generated by concatenating multiple column values. - */ - protected ImmutableBytesWritable createGroupKey(byte[][] vals) { - if(vals == null) { - return null; - } - StringBuilder sb = new StringBuilder(); - for(int i = 0; i < vals.length; i++) { - if(i > 0) { - sb.append(" "); - } - sb.append(Bytes.toString(vals[i])); - } - return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); - } - - /** - * Returns the current configuration. - * - * @return The current configuration. - * @see org.apache.hadoop.conf.Configurable#getConf() - */ - @Override - public Configuration getConf() { - return conf; - } - - /** - * Sets the configuration. This is used to set up the grouping details. - * - * @param configuration The configuration to set. - * @see org.apache.hadoop.conf.Configurable#setConf( - * org.apache.hadoop.conf.Configuration) - */ - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - String[] cols = conf.get(GROUP_COLUMNS, "").split(" "); - columns = new byte[cols.length][]; - for(int i = 0; i < cols.length; i++) { - columns[i] = Bytes.toBytes(cols[i]); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java deleted file mode 100644 index e90d5c1..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple MR input format for HFiles. - * This code was borrowed from Apache Crunch project. - * Updated to the recent version of HBase. - */ -public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { - - private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); - - /** - * File filter that removes all "hidden" files. This might be something worth removing from - * a more general purpose utility; it accounts for the presence of metadata files created - * in the way we're doing exports. - */ - static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { - @Override - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - - /** - * Record reader for HFiles. - */ - private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { - - private Reader in; - protected Configuration conf; - private HFileScanner scanner; - - /** - * A private cache of the key value so it doesn't need to be loaded twice from the scanner. - */ - private Cell value = null; - private long count; - private boolean seeked = false; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) split; - conf = context.getConfiguration(); - Path path = fileSplit.getPath(); - FileSystem fs = path.getFileSystem(conf); - LOG.info("Initialize HFileRecordReader for {}", path); - this.in = HFile.createReader(fs, path, conf); - - // The file info must be loaded before the scanner can be used. - // This seems like a bug in HBase, but it's easily worked around. - this.in.loadFileInfo(); - this.scanner = in.getScanner(false, false); - - } - - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - boolean hasNext; - if (!seeked) { - LOG.info("Seeking to start"); - hasNext = scanner.seekTo(); - seeked = true; - } else { - hasNext = scanner.next(); - } - if (!hasNext) { - return false; - } - value = scanner.getCell(); - count++; - return true; - } - - @Override - public NullWritable getCurrentKey() throws IOException, InterruptedException { - return NullWritable.get(); - } - - @Override - public Cell getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to - // the start row, but better than nothing anyway. - return 1.0f * count / in.getEntries(); - } - - @Override - public void close() throws IOException { - if (in != null) { - in.close(); - in = null; - } - } - } - - @Override - protected List<FileStatus> listStatus(JobContext job) throws IOException { - List<FileStatus> result = new ArrayList<FileStatus>(); - - // Explode out directories that match the original FileInputFormat filters - // since HFiles are written to directories where the - // directory name is the column name - for (FileStatus status : super.listStatus(job)) { - if (status.isDirectory()) { - FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); - for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { - result.add(match); - } - } else { - result.add(status); - } - } - return result; - } - - @Override - public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new HFileRecordReader(); - } - - @Override - protected boolean isSplitable(JobContext context, Path filename) { - // This file isn't splittable. - return false; - } -} \ No newline at end of file