http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java new file mode 100644 index 0000000..e90d5c1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple MR input format for HFiles. + * This code was borrowed from Apache Crunch project. + * Updated to the recent version of HBase. + */ +public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { + + private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private Cell value = null; + private long count; + private boolean seeked = false; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Initialize HFileRecordReader for {}", path); + this.in = HFile.createReader(fs, path, conf); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext; + if (!seeked) { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + seeked = true; + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getCell(); + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public Cell getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + } + } + + @Override + protected List<FileStatus> listStatus(JobContext job) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + + // Explode out directories that match the original FileInputFormat filters + // since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } else { + result.add(status); + } + } + return result; + } + + @Override + public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 718e88b..f59e24c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -100,7 +100,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * Tool to load the output of HFileOutputFormat into an existing table. */ @@ -116,7 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; - public final static String SILENCE_CONF_KEY = "ignore.unmatched.families"; + public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; public final static String ALWAYS_COPY_FILES = "always.copy.files"; // We use a '.' prefix which is ignored when walking directory trees @@ -168,7 +167,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" - + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + "\n"); } @@ -530,7 +529,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { boolean success = false; try { LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); + + Bytes.toStringBinary(getRow()) + " with hfile group " + + LoadIncrementalHFiles.this.toString( famPaths)); byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); @@ -1047,6 +1047,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + private final String toString(List<Pair<byte[], String>> list) { + StringBuffer sb = new StringBuffer(); + sb.append("["); + if(list != null){ + for(Pair<byte[], String> pair: list) { + sb.append("{"); + sb.append(Bytes.toStringBinary(pair.getFirst())); + sb.append(","); + sb.append(pair.getSecond()); + sb.append("}"); + } + } + sb.append("]"); + return sb.toString(); + } private boolean isSecureBulkLoadEndpointAvailable() { String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); @@ -1245,7 +1260,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); + boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); if (dirPath != null) { doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 8514ace..0ca78b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -27,22 +27,26 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; /** * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. @@ -169,7 +173,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { temp = reader.next(currentEntry); i++; } catch (EOFException x) { - LOG.info("Corrupted entry detected. Ignoring the rest of the file." + LOG.warn("Corrupted entry detected. Ignoring the rest of the file." + " (This is normal when a RegionServer crashed.)"); return false; } @@ -231,29 +235,37 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir")); - + Path[] inputPaths = getInputPaths(conf); long startTime = conf.getLong(startKey, Long.MIN_VALUE); long endTime = conf.getLong(endKey, Long.MAX_VALUE); - FileSystem fs = inputDir.getFileSystem(conf); - List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime); - - List<InputSplit> splits = new ArrayList<>(files.size()); - for (FileStatus file : files) { + List<FileStatus> allFiles = new ArrayList<FileStatus>(); + for(Path inputPath: inputPaths){ + FileSystem fs = inputPath.getFileSystem(conf); + List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); + allFiles.addAll(files); + } + List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); + for (FileStatus file : allFiles) { splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); } return splits; } + private Path[] getInputPaths(Configuration conf) { + String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); + return StringUtils.stringToPath(inpDirs.split(",")); + } + private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) throws IOException { List<FileStatus> result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); - FileStatus[] files = fs.listStatus(dir); - if (files == null) return Collections.emptyList(); - for (FileStatus file : files) { + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); + if (!iter.hasNext()) return Collections.emptyList(); + while (iter.hasNext()) { + LocatedFileStatus file = iter.next(); if (file.isDirectory()) { // recurse into sub directories result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); @@ -264,7 +276,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { try { long fileStartTime = Long.parseLong(name.substring(idx+1)); if (fileStartTime <= endTime) { - LOG.info("Found: " + name); + LOG.info("Found: " + file); result.add(file); } } catch (NumberFormatException x) { http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index cca2041..d16dcf5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -70,9 +70,9 @@ import org.apache.hadoop.util.ToolRunner; public class WALPlayer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(WALPlayer.class); final static String NAME = "WALPlayer"; - final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; - final static String TABLES_KEY = "wal.input.tables"; - final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; + public final static String TABLES_KEY = "wal.input.tables"; + public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; // This relies on Hadoop Configuration to handle warning about deprecated configs and // to set the correct non-deprecated configs when an old one shows up. @@ -84,6 +84,9 @@ public class WALPlayer extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public WALPlayer(){ + } + protected WALPlayer(final Configuration c) { super(c); } @@ -93,7 +96,7 @@ public class WALPlayer extends Configured implements Tool { * This one can be used together with {@link KeyValueSortReducer} */ static class WALKeyValueMapper - extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { + extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { private byte[] table; @Override @@ -105,7 +108,9 @@ public class WALPlayer extends Configured implements Tool { if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (WALEdit.isMetaEditFamily(kv)) continue; + if (WALEdit.isMetaEditFamily(kv)) { + continue; + } context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); } } @@ -148,7 +153,9 @@ public class WALPlayer extends Configured implements Tool { Cell lastCell = null; for (Cell cell : value.getCells()) { // filtering WAL meta entries - if (WALEdit.isMetaEditFamily(cell)) continue; + if (WALEdit.isMetaEditFamily(cell)) { + continue; + } // Allow a subclass filter out this cell. if (filter(context, cell)) { @@ -159,8 +166,12 @@ public class WALPlayer extends Configured implements Tool { if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRow(lastCell, cell)) { // row or type changed, write out aggregate KVs. - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } if (CellUtil.isDelete(cell)) { del = new Delete(CellUtil.cloneRow(cell)); } else { @@ -176,31 +187,41 @@ public class WALPlayer extends Configured implements Tool { lastCell = cell; } // write residual KVs - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } } } catch (InterruptedException e) { e.printStackTrace(); } } - /** - * @param cell - * @return Return true if we are to emit this cell. - */ protected boolean filter(Context context, final Cell cell) { return true; } @Override + protected void + cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) + throws IOException, InterruptedException { + super.cleanup(context); + } + + @Override public void setup(Context context) throws IOException { String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); - if (tablesToUse == null && tableMap == null) { + if (tableMap == null) { + tableMap = tablesToUse; + } + if (tablesToUse == null) { // Then user wants all tables. - } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { + } else if (tablesToUse.length != tableMap.length) { // this can only happen when WALMapper is used directly by a class other than WALPlayer - throw new IOException("No tables or incorrect table mapping specified."); + throw new IOException("Incorrect table mapping specified ."); } int i = 0; if (tablesToUse != null) { @@ -214,7 +235,9 @@ public class WALPlayer extends Configured implements Tool { void setupTime(Configuration conf, String option) throws IOException { String val = conf.get(option); - if (null == val) return; + if (null == val) { + return; + } long ms; try { // first try to parse in user friendly form @@ -243,7 +266,7 @@ public class WALPlayer extends Configured implements Tool { Configuration conf = getConf(); setupTime(conf, WALInputFormat.START_TIME_KEY); setupTime(conf, WALInputFormat.END_TIME_KEY); - Path inputDir = new Path(args[0]); + String inputDirs = args[0]; String[] tables = args[1].split(","); String[] tableMap; if (args.length > 2) { @@ -257,13 +280,18 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir)); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.setInputPaths(job, inputDir); + + FileInputFormat.addInputPaths(job, inputDirs); + job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + // the bulk HFile case if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); @@ -299,7 +327,9 @@ public class WALPlayer extends Configured implements Tool { return job; } - /* + + /** + * Print usage * @param errorMsg Error message. Can be null. */ private void usage(final String errorMsg) { @@ -309,7 +339,8 @@ public class WALPlayer extends Configured implements Tool { System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]"); System.err.println("Read all WAL entries for <tables>."); System.err.println("If no tables (\"\") are specific, all tables are imported."); - System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)"); + System.err.println("(Careful, even hbase:meta entries will be imported"+ + " in that case.)"); System.err.println("Otherwise <tables> is a comma separated list of tables.\n"); System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>."); System.err.println("<tableMapping> is a command separated list of targettables."); @@ -322,10 +353,10 @@ public class WALPlayer extends Configured implements Tool { System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the wal player"); + + "=jobName - use the specified mapreduce job name for the wal player"); System.err.println("For performance also consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false"); + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false"); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 177ee32..f86f800 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -74,10 +74,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; @@ -88,9 +86,129 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockH import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse.Capability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; @@ -233,6 +351,7 @@ public class MasterRpcServices extends RSRpcServices /** * @return list of blocking services and their security info classes that this server supports */ + @Override protected List<BlockingServiceAndInterface> getServices() { List<BlockingServiceAndInterface> bssi = new ArrayList<>(5); bssi.add(new BlockingServiceAndInterface( http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 54b68d3..55d58e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -62,7 +62,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure.Procedure; import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; @@ -1114,7 +1114,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // setup the default procedure coordinator String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); - ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( + ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java new file mode 100644 index 0000000..b656894 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator} + */ +@InterfaceAudience.Private +public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { + private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinator.class); + private ZKProcedureUtil zkProc = null; + protected ProcedureCoordinator coordinator = null; // if started this should be non-null + + ZooKeeperWatcher watcher; + String procedureType; + String coordName; + + /** + * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()} + * @param procedureClass procedure type name is a category for when there are multiple kinds of + * procedures.-- this becomes a znode so be aware of the naming restrictions + * @param coordName name of the node running the coordinator + * @throws KeeperException if an unexpected zk error occurs + */ + public ZKProcedureCoordinator(ZooKeeperWatcher watcher, + String procedureClass, String coordName) { + this.watcher = watcher; + this.procedureType = procedureClass; + this.coordName = coordName; + } + + /** + * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes + * appear, first acquire to relevant listener or sets watch waiting for notification of + * the acquire node + * + * @param proc the Procedure + * @param info data to be stored in the acquire node + * @param nodeNames children of the acquire phase + * @throws IOException if any failure occurs. + */ + @Override + final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames) + throws IOException, IllegalArgumentException { + String procName = proc.getName(); + // start watching for the abort node + String abortNode = zkProc.getAbortZNode(procName); + try { + // check to see if the abort node already exists + if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) { + abort(abortNode); + } + // If we get an abort node watch triggered here, we'll go complete creating the acquired + // znode but then handle the acquire znode and bail out + } catch (KeeperException e) { + String msg = "Failed while watching abort node:" + abortNode; + LOG.error(msg, e); + throw new IOException(msg, e); + } + + // create the acquire barrier + String acquire = zkProc.getAcquiredBarrierNode(procName); + LOG.debug("Creating acquire znode:" + acquire); + try { + // notify all the procedure listeners to look for the acquire node + byte[] data = ProtobufUtil.prependPBMagic(info); + ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data); + // loop through all the children of the acquire phase and watch for them + for (String node : nodeNames) { + String znode = ZKUtil.joinZNode(acquire, node); + LOG.debug("Watching for acquire node:" + znode); + if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { + coordinator.memberAcquiredBarrier(procName, node); + } + } + } catch (KeeperException e) { + String msg = "Failed while creating acquire node:" + acquire; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + + @Override + public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException { + String procName = proc.getName(); + String reachedNode = zkProc.getReachedBarrierNode(procName); + LOG.debug("Creating reached barrier zk node:" + reachedNode); + try { + // create the reached znode and watch for the reached znodes + ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode); + // loop through all the children of the acquire phase and watch for them + for (String node : nodeNames) { + String znode = ZKUtil.joinZNode(reachedNode, node); + if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { + byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); + // ProtobufUtil.isPBMagicPrefix will check null + if (dataFromMember != null && dataFromMember.length > 0) { + if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { + String msg = + "Failed to get data from finished node or data is illegally formatted: " + znode; + LOG.error(msg); + throw new IOException(msg); + } else { + dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), + dataFromMember.length); + coordinator.memberFinishedBarrier(procName, node, dataFromMember); + } + } else { + coordinator.memberFinishedBarrier(procName, node, dataFromMember); + } + } + } + } catch (KeeperException e) { + String msg = "Failed while creating reached node:" + reachedNode; + LOG.error(msg, e); + throw new IOException(msg, e); + } catch (InterruptedException e) { + String msg = "Interrupted while creating reached node:" + reachedNode; + LOG.error(msg, e); + throw new InterruptedIOException(msg); + } + } + + + /** + * Delete znodes that are no longer in use. + */ + @Override + final public void resetMembers(Procedure proc) throws IOException { + String procName = proc.getName(); + boolean stillGettingNotifications = false; + do { + try { + LOG.debug("Attempting to clean out zk node for op:" + procName); + zkProc.clearZNodes(procName); + stillGettingNotifications = false; + } catch (KeeperException.NotEmptyException e) { + // recursive delete isn't transactional (yet) so we need to deal with cases where we get + // children trickling in + stillGettingNotifications = true; + } catch (KeeperException e) { + String msg = "Failed to complete reset procedure " + procName; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } while (stillGettingNotifications); + } + + /** + * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. + * @return true if succeed, false if encountered initialization errors. + */ + @Override + final public boolean start(final ProcedureCoordinator coordinator) { + if (this.coordinator != null) { + throw new IllegalStateException( + "ZKProcedureCoordinator already started and already has listener installed"); + } + this.coordinator = coordinator; + + try { + this.zkProc = new ZKProcedureUtil(watcher, procedureType) { + @Override + public void nodeCreated(String path) { + if (!isInProcedurePath(path)) return; + LOG.debug("Node created: " + path); + logZKTree(this.baseZNode); + if (isAcquiredPathNode(path)) { + // node wasn't present when we created the watch so zk event triggers acquire + coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), + ZKUtil.getNodeName(path)); + } else if (isReachedPathNode(path)) { + // node was absent when we created the watch so zk event triggers the finished barrier. + + // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. + String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); + String member = ZKUtil.getNodeName(path); + // get the data from the procedure member + try { + byte[] dataFromMember = ZKUtil.getData(watcher, path); + // ProtobufUtil.isPBMagicPrefix will check null + if (dataFromMember != null && dataFromMember.length > 0) { + if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { + ForeignException ee = new ForeignException(coordName, + "Failed to get data from finished node or data is illegally formatted:" + + path); + coordinator.abortProcedure(procName, ee); + } else { + dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), + dataFromMember.length); + LOG.debug("Finished data from procedure '" + procName + + "' member '" + member + "': " + new String(dataFromMember)); + coordinator.memberFinishedBarrier(procName, member, dataFromMember); + } + } else { + coordinator.memberFinishedBarrier(procName, member, dataFromMember); + } + } catch (KeeperException e) { + ForeignException ee = new ForeignException(coordName, e); + coordinator.abortProcedure(procName, ee); + } catch (InterruptedException e) { + ForeignException ee = new ForeignException(coordName, e); + coordinator.abortProcedure(procName, ee); + } + } else if (isAbortPathNode(path)) { + abort(path); + } else { + LOG.debug("Ignoring created notification for node:" + path); + } + } + }; + zkProc.clearChildZNodes(); + } catch (KeeperException e) { + LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e); + return false; + } + + LOG.debug("Starting the controller for procedure member:" + coordName); + return true; + } + + /** + * This is the abort message being sent by the coordinator to member + * + * TODO this code isn't actually used but can be used to issue a cancellation from the + * coordinator. + */ + @Override + final public void sendAbortToMembers(Procedure proc, ForeignException ee) { + String procName = proc.getName(); + LOG.debug("Aborting procedure '" + procName + "' in zk"); + String procAbortNode = zkProc.getAbortZNode(procName); + try { + LOG.debug("Creating abort znode:" + procAbortNode); + String source = (ee.getSource() == null) ? coordName : ee.getSource(); + byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); + // first create the znode for the procedure + ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo); + LOG.debug("Finished creating abort node:" + procAbortNode); + } catch (KeeperException e) { + // possible that we get this error for the procedure if we already reset the zk state, but in + // that case we should still get an error for that procedure anyways + zkProc.logZKTree(zkProc.baseZNode); + coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode + + " to abort procedure '" + procName + "'", new IOException(e)); + } + } + + /** + * Receive a notification and propagate it to the local coordinator + * @param abortNode full znode path to the failed procedure information + */ + protected void abort(String abortNode) { + String procName = ZKUtil.getNodeName(abortNode); + ForeignException ee = null; + try { + byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode); + if (data == null || data.length == 0) { + // ignore + return; + } else if (!ProtobufUtil.isPBMagicPrefix(data)) { + LOG.warn("Got an error notification for op:" + abortNode + + " but we can't read the information. Killing the procedure."); + // we got a remote exception, but we can't describe it + ee = new ForeignException(coordName, + "Data in abort node is illegally formatted. ignoring content."); + } else { + + data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); + ee = ForeignException.deserialize(data); + } + } catch (IOException e) { + LOG.warn("Got an error notification for op:" + abortNode + + " but we can't read the information. Killing the procedure."); + // we got a remote exception, but we can't describe it + ee = new ForeignException(coordName, e); + } catch (KeeperException e) { + coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode + + zkProc.getAbortZnode(), new IOException(e)); + } catch (InterruptedException e) { + coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode + + zkProc.getAbortZnode(), new IOException(e)); + Thread.currentThread().interrupt(); + } + coordinator.abortProcedure(procName, ee); + } + + @Override + final public void close() throws IOException { + zkProc.close(); + } + + /** + * Used in testing + */ + final ZKProcedureUtil getZkProcedureUtil() { + return zkProc; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java deleted file mode 100644 index 4632d23..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.procedure; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.errorhandling.ForeignException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -/** - * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator} - */ -@InterfaceAudience.Private -public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { - private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class); - private ZKProcedureUtil zkProc = null; - protected ProcedureCoordinator coordinator = null; // if started this should be non-null - - ZooKeeperWatcher watcher; - String procedureType; - String coordName; - - /** - * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()} - * @param procedureClass procedure type name is a category for when there are multiple kinds of - * procedures.-- this becomes a znode so be aware of the naming restrictions - * @param coordName name of the node running the coordinator - * @throws KeeperException if an unexpected zk error occurs - */ - public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher, - String procedureClass, String coordName) throws KeeperException { - this.watcher = watcher; - this.procedureType = procedureClass; - this.coordName = coordName; - } - - /** - * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes - * appear, first acquire to relevant listener or sets watch waiting for notification of - * the acquire node - * - * @param proc the Procedure - * @param info data to be stored in the acquire node - * @param nodeNames children of the acquire phase - * @throws IOException if any failure occurs. - */ - @Override - final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames) - throws IOException, IllegalArgumentException { - String procName = proc.getName(); - // start watching for the abort node - String abortNode = zkProc.getAbortZNode(procName); - try { - // check to see if the abort node already exists - if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) { - abort(abortNode); - } - // If we get an abort node watch triggered here, we'll go complete creating the acquired - // znode but then handle the acquire znode and bail out - } catch (KeeperException e) { - String msg = "Failed while watching abort node:" + abortNode; - LOG.error(msg, e); - throw new IOException(msg, e); - } - - // create the acquire barrier - String acquire = zkProc.getAcquiredBarrierNode(procName); - LOG.debug("Creating acquire znode:" + acquire); - try { - // notify all the procedure listeners to look for the acquire node - byte[] data = ProtobufUtil.prependPBMagic(info); - ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data); - // loop through all the children of the acquire phase and watch for them - for (String node : nodeNames) { - String znode = ZKUtil.joinZNode(acquire, node); - LOG.debug("Watching for acquire node:" + znode); - if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { - coordinator.memberAcquiredBarrier(procName, node); - } - } - } catch (KeeperException e) { - String msg = "Failed while creating acquire node:" + acquire; - LOG.error(msg, e); - throw new IOException(msg, e); - } - } - - @Override - public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException { - String procName = proc.getName(); - String reachedNode = zkProc.getReachedBarrierNode(procName); - LOG.debug("Creating reached barrier zk node:" + reachedNode); - try { - // create the reached znode and watch for the reached znodes - ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode); - // loop through all the children of the acquire phase and watch for them - for (String node : nodeNames) { - String znode = ZKUtil.joinZNode(reachedNode, node); - if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { - byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); - // ProtobufUtil.isPBMagicPrefix will check null - if (dataFromMember != null && dataFromMember.length > 0) { - if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { - String msg = - "Failed to get data from finished node or data is illegally formatted: " + znode; - LOG.error(msg); - throw new IOException(msg); - } else { - dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), - dataFromMember.length); - coordinator.memberFinishedBarrier(procName, node, dataFromMember); - } - } else { - coordinator.memberFinishedBarrier(procName, node, dataFromMember); - } - } - } - } catch (KeeperException e) { - String msg = "Failed while creating reached node:" + reachedNode; - LOG.error(msg, e); - throw new IOException(msg, e); - } catch (InterruptedException e) { - String msg = "Interrupted while creating reached node:" + reachedNode; - LOG.error(msg, e); - throw new InterruptedIOException(msg); - } - } - - - /** - * Delete znodes that are no longer in use. - */ - @Override - final public void resetMembers(Procedure proc) throws IOException { - String procName = proc.getName(); - boolean stillGettingNotifications = false; - do { - try { - LOG.debug("Attempting to clean out zk node for op:" + procName); - zkProc.clearZNodes(procName); - stillGettingNotifications = false; - } catch (KeeperException.NotEmptyException e) { - // recursive delete isn't transactional (yet) so we need to deal with cases where we get - // children trickling in - stillGettingNotifications = true; - } catch (KeeperException e) { - String msg = "Failed to complete reset procedure " + procName; - LOG.error(msg, e); - throw new IOException(msg, e); - } - } while (stillGettingNotifications); - } - - /** - * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. - * @return true if succeed, false if encountered initialization errors. - */ - final public boolean start(final ProcedureCoordinator coordinator) { - if (this.coordinator != null) { - throw new IllegalStateException( - "ZKProcedureCoordinator already started and already has listener installed"); - } - this.coordinator = coordinator; - - try { - this.zkProc = new ZKProcedureUtil(watcher, procedureType) { - @Override - public void nodeCreated(String path) { - if (!isInProcedurePath(path)) return; - LOG.debug("Node created: " + path); - logZKTree(this.baseZNode); - if (isAcquiredPathNode(path)) { - // node wasn't present when we created the watch so zk event triggers acquire - coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), - ZKUtil.getNodeName(path)); - } else if (isReachedPathNode(path)) { - // node was absent when we created the watch so zk event triggers the finished barrier. - - // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. - String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); - String member = ZKUtil.getNodeName(path); - // get the data from the procedure member - try { - byte[] dataFromMember = ZKUtil.getData(watcher, path); - // ProtobufUtil.isPBMagicPrefix will check null - if (dataFromMember != null && dataFromMember.length > 0) { - if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { - ForeignException ee = new ForeignException(coordName, - "Failed to get data from finished node or data is illegally formatted:" - + path); - coordinator.abortProcedure(procName, ee); - } else { - dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), - dataFromMember.length); - LOG.debug("Finished data from procedure '" + procName - + "' member '" + member + "': " + new String(dataFromMember)); - coordinator.memberFinishedBarrier(procName, member, dataFromMember); - } - } else { - coordinator.memberFinishedBarrier(procName, member, dataFromMember); - } - } catch (KeeperException e) { - ForeignException ee = new ForeignException(coordName, e); - coordinator.abortProcedure(procName, ee); - } catch (InterruptedException e) { - ForeignException ee = new ForeignException(coordName, e); - coordinator.abortProcedure(procName, ee); - } - } else if (isAbortPathNode(path)) { - abort(path); - } else { - LOG.debug("Ignoring created notification for node:" + path); - } - } - }; - zkProc.clearChildZNodes(); - } catch (KeeperException e) { - LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e); - return false; - } - - LOG.debug("Starting the controller for procedure member:" + coordName); - return true; - } - - /** - * This is the abort message being sent by the coordinator to member - * - * TODO this code isn't actually used but can be used to issue a cancellation from the - * coordinator. - */ - @Override - final public void sendAbortToMembers(Procedure proc, ForeignException ee) { - String procName = proc.getName(); - LOG.debug("Aborting procedure '" + procName + "' in zk"); - String procAbortNode = zkProc.getAbortZNode(procName); - try { - LOG.debug("Creating abort znode:" + procAbortNode); - String source = (ee.getSource() == null) ? coordName : ee.getSource(); - byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee)); - // first create the znode for the procedure - ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo); - LOG.debug("Finished creating abort node:" + procAbortNode); - } catch (KeeperException e) { - // possible that we get this error for the procedure if we already reset the zk state, but in - // that case we should still get an error for that procedure anyways - zkProc.logZKTree(zkProc.baseZNode); - coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode - + " to abort procedure '" + procName + "'", new IOException(e)); - } - } - - /** - * Receive a notification and propagate it to the local coordinator - * @param abortNode full znode path to the failed procedure information - */ - protected void abort(String abortNode) { - String procName = ZKUtil.getNodeName(abortNode); - ForeignException ee = null; - try { - byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode); - if (data == null || data.length == 0) { - // ignore - return; - } else if (!ProtobufUtil.isPBMagicPrefix(data)) { - LOG.warn("Got an error notification for op:" + abortNode - + " but we can't read the information. Killing the procedure."); - // we got a remote exception, but we can't describe it - ee = new ForeignException(coordName, - "Data in abort node is illegally formatted. ignoring content."); - } else { - - data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length); - ee = ForeignException.deserialize(data); - } - } catch (IOException e) { - LOG.warn("Got an error notification for op:" + abortNode - + " but we can't read the information. Killing the procedure."); - // we got a remote exception, but we can't describe it - ee = new ForeignException(coordName, e); - } catch (KeeperException e) { - coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode - + zkProc.getAbortZnode(), new IOException(e)); - } catch (InterruptedException e) { - coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode - + zkProc.getAbortZnode(), new IOException(e)); - Thread.currentThread().interrupt(); - } - coordinator.abortProcedure(procName, ee); - } - - @Override - final public void close() throws IOException { - zkProc.close(); - } - - /** - * Used in testing - */ - final ZKProcedureUtil getZkProcedureUtil() { - return zkProc; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 7b624a5..3092114 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure.Procedure; import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -98,7 +98,7 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { // setup the procedure coordinator String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); - ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( + ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( master.getZooKeeper(), getProcedureSignature(), name); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index be4cca0..b3b5113 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -199,13 +199,13 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; +import sun.misc.Signal; +import sun.misc.SignalHandler; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import sun.misc.Signal; -import sun.misc.SignalHandler; - /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -385,7 +385,7 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - final LogRoller walRoller; + protected final LogRoller walRoller; // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -535,7 +535,6 @@ public class HRegionServer extends HasThread implements // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - // Config'ed params this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -640,7 +639,7 @@ public class HRegionServer extends HasThread implements int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); this.compactedFileDischarger = - new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this); + new CompactedHFilesDischarger(cleanerInterval, this, this); choreService.scheduleChore(compactedFileDischarger); } @@ -859,7 +858,7 @@ public class HRegionServer extends HasThread implements rspmHost.loadProcedures(conf); rspmHost.initialize(this); } catch (KeeperException e) { - this.abort("Failed to reach zk cluster when creating procedure handler.", e); + this.abort("Failed to reach coordination cluster when creating procedure handler.", e); } // register watcher for recovering regions this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this); @@ -1948,6 +1947,10 @@ public class HRegionServer extends HasThread implements return wal; } + public LogRoller getWalRoller() { + return walRoller; + } + @Override public Connection getConnection() { return getClusterConnection(); http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 9dd85d8..bf14933 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -368,6 +368,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen return false; } + public static boolean isArchivedLogFile(Path p) { + String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR; + return p.toString().contains(oldLog); + } + /** * Get prefix of the log from its name, assuming WAL name in format of * log_prefix.filenumber.log_suffix http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a6c7f68..8a4ed72 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -49,7 +49,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; @@ -117,6 +116,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; @@ -137,6 +137,8 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * Facility for testing HBase. Replacement for * old HBaseTestCase and HBaseClusterTestCase functionality. @@ -2171,6 +2173,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } + public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) + throws IOException { + Random r = new Random(); + byte[] row = new byte[rowSize]; + for (int i = 0; i < totalRows; i++) { + r.nextBytes(row); + Put put = new Put(row); + put.addColumn(f, new byte[]{0}, new byte[]{0}); + t.put(put); + } + } + public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, int replicaId) throws IOException { @@ -3337,6 +3351,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Waith until all system table's regions get assigned + * @throws IOException + */ + public void waitUntilAllSystemRegionsAssigned() throws IOException { + waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); + waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME); + } + + /** * Wait until all regions for a table in hbase:meta have a non-empty * info:server, or until timeout. This means all regions have been deployed, * master has been informed and updated hbase:meta with the regions deployed @@ -3801,12 +3824,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public static int createPreSplitLoadTestTable(Configuration conf, HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException { + + return createPreSplitLoadTestTable(conf, desc, hcds, + new RegionSplitter.HexStringSplit(), numRegionsPerServer); + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + HTableDescriptor desc, HColumnDescriptor[] hcds, + SplitAlgorithm splitter, int numRegionsPerServer) throws IOException { for (HColumnDescriptor hcd : hcds) { if (!desc.hasFamily(hcd.getName())) { desc.addFamily(hcd); } } - int totalNumberOfRegions = 0; Connection unmanagedConnection = ConnectionFactory.createConnection(conf); Admin admin = unmanagedConnection.getAdmin(); @@ -3825,7 +3860,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " + numRegionsPerServer + ")"); - byte[][] splits = new RegionSplitter.HexStringSplit().split( + byte[][] splits = splitter.split( totalNumberOfRegions); admin.createTable(desc, splits);