Revert "KYLIN-2788 fix bug in HFileOutputFormat2" This reverts commit dc8203b2a544ff48ee082dfea5d050bac9cabd17.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c9a55c9f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c9a55c9f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c9a55c9f Branch: refs/heads/master Commit: c9a55c9fde8f8bfaafaf1277311c28cf06131ceb Parents: 22631a6 Author: Shaofeng Shi <shaofeng...@gmail.com> Authored: Tue Sep 26 18:27:03 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Tue Sep 26 20:51:26 2017 +0800 ---------------------------------------------------------------------- .../kylin/storage/hbase/steps/CubeHFileJob.java | 3 +- .../storage/hbase/steps/HFileOutputFormat3.java | 733 ------------------- 2 files changed, 2 insertions(+), 734 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c9a55c9f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 7dc67e5..1a624c4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.SequenceFile; @@ -94,7 +95,7 @@ public class CubeHFileJob extends AbstractHadoopJob { HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase()); // Automatic config ! - HFileOutputFormat3.configureIncrementalLoad(job, htable); + HFileOutputFormat.configureIncrementalLoad(job, htable); reconfigurePartitions(hbaseConf, partitionFilePath); // set block replication to 3 for hfiles http://git-wip-us.apache.org/repos/asf/kylin/blob/c9a55c9f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java deleted file mode 100644 index d50166e..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java +++ /dev/null @@ -1,733 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.storage.hbase.steps; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hbase.mapreduce.MutationSerialization; -import org.apache.hadoop.hbase.mapreduce.PutSortReducer; -import org.apache.hadoop.hbase.mapreduce.ResultSerialization; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TextSortReducer; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788 - * - * Writes HFiles. Passed Cells must arrive in order. - * Writes current time as the sequence id for the file. Sets the major compacted - * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll - * all HFiles being written. - * <p> - * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class HFileOutputFormat3 - extends FileOutputFormat<ImmutableBytesWritable, Cell> { - static Log LOG = LogFactory.getLog(HFileOutputFormat3.class); - - // The following constants are private since these are used by - // HFileOutputFormat2 to internally transfer data between job setup and - // reducer run using conf. - // These should not be changed by the client. - private static final String COMPRESSION_FAMILIES_CONF_KEY = - "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = - "hbase.hfileoutputformat.families.bloomtype"; - private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.blocksize"; - private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; - - // This constant is public since the client can modify this when setting - // up their conf object and thus refer to this symbol. - // It is present for backwards compatibility reasons. Use it only to - // override the auto-detection of datablock encoding. - public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - - @Override - public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( - final TaskAttemptContext context) throws IOException, InterruptedException { - return createRecordWriter(context); - } - - static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> - createRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { - - final Configuration conf = context.getConfiguration(); - Path outputPath = FileOutputFormat.getOutputPath(context); - // Get the path of the temporary output file - FileOutputCommitter committer = (FileOutputCommitter) ReflectionUtils.newInstance(conf.getClass( - "mapreduce.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapreduce.OutputCommitter.class), outputPath, context); - - final Path outputdir = committer.getWorkPath(); - LOG.debug("Task output path: " + outputdir); - final FileSystem fs = outputdir.getFileSystem(conf); - // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompressionStr = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = AbstractHFileWriter - .compressionByName(defaultCompressionStr); - final boolean compactionExclude = conf.getBoolean( - "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - - // create a map from column family to the compression algorithm - final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); - final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); - final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); - - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - final Map<byte[], DataBlockEncoding> datablockEncodingMap - = createFamilyDataBlockEncodingMap(conf); - final DataBlockEncoding overriddenEncoding; - if (dataBlockEncodingStr != null) { - overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); - } else { - overriddenEncoding = null; - } - - return new RecordWriter<ImmutableBytesWritable, V>() { - // Map of families to writers and how much has been output on the writer. - private final Map<byte [], WriterLength> writers = - new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); - private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); - private boolean rollRequested = false; - - @Override - public void write(ImmutableBytesWritable row, V cell) - throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (row == null && kv == null) { - rollWriters(); - return; - } - byte [] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); - byte [] family = CellUtil.cloneFamily(kv); - WriterLength wl = this.writers.get(family); - if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); - } - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); - } - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info("Writer=" + wl.writer.getPath() + - ((wl.written == 0)? "": ", wrote=" + wl.written)); - close(wl.writer); - } - wl.writer = null; - wl.written = 0; - } - this.rollRequested = false; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", - justification="Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf) - throws IOException { - WriterLength wl = new WriterLength(); - Path familydir = new Path(outputdir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); - compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(family); - bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(family); - blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; - DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; - encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder() - .withCompression(compression) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) - .withBlockSize(blockSize); - contextBuilder.withDataBlockEncoding(encoding); - HFileContext hFileContext = contextBuilder.build(); - - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(KeyValue.COMPARATOR) - .withFileContext(hFileContext).build(); - - this.writers.put(family, wl); - return wl; - } - - private void close(final StoreFile.Writer w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); - } - } - - @Override - public void close(TaskAttemptContext c) - throws IOException, InterruptedException { - for (WriterLength wl: this.writers.values()) { - close(wl.writer); - } - } - }; - } - - /* - * Data structure to hold a Writer and amount of data written on it. - */ - static class WriterLength { - long written = 0; - StoreFile.Writer writer = null; - } - - /** - * Return the start keys of all of the regions in this table, - * as a list of ImmutableBytesWritable. - */ - private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) - throws IOException { - byte[][] byteKeys = table.getStartKeys(); - ArrayList<ImmutableBytesWritable> ret = - new ArrayList<ImmutableBytesWritable>(byteKeys.length); - for (byte[] byteKey : byteKeys) { - ret.add(new ImmutableBytesWritable(byteKey)); - } - return ret; - } - - /** - * Write out a {@link SequenceFile} that can be read by - * {@link TotalOrderPartitioner} that contains the split points in startKeys. - */ - @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, - List<ImmutableBytesWritable> startKeys) throws IOException { - LOG.info("Writing partition information to " + partitionsPath); - if (startKeys.isEmpty()) { - throw new IllegalArgumentException("No regions passed"); - } - - // We're generating a list of split points, and we don't ever - // have keys < the first region (which has an empty start key) - // so we need to remove it. Otherwise we would end up with an - // empty reducer with index 0 - TreeSet<ImmutableBytesWritable> sorted = - new TreeSet<ImmutableBytesWritable>(startKeys); - - ImmutableBytesWritable first = sorted.first(); - if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException( - "First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); - } - sorted.remove(first); - - // Write the actual file - FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter( - fs, conf, partitionsPath, ImmutableBytesWritable.class, - NullWritable.class); - - try { - for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); - } - } finally { - writer.close(); - } - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - * - * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. - */ - @Deprecated - public static void configureIncrementalLoad(Job job, HTable table) - throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) - throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, - RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat3.class); - } - - static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException, - UnsupportedEncodingException { - Configuration conf = job.getConfiguration(); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(cls); - - // Based on the configured map output class, set the correct reducer to properly - // sort the incoming values. - // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); - } else if (Put.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(PutSortReducer.class); - } else if (Text.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(TextSortReducer.class); - } else { - LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); - } - - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); - List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count"); - job.setNumReduceTasks(startKeys.size()); - - configurePartitioner(job, startKeys); - // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + regionLocator.getName() + " output configured."); - } - - public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { - Configuration conf = job.getConfiguration(); - - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(HFileOutputFormat3.class); - - // Set compression algorithms based on column families - configureCompression(conf, table.getTableDescriptor()); - configureBloomType(table.getTableDescriptor(), conf); - configureBlockSize(table.getTableDescriptor(), conf); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - configureDataBlockEncoding(tableDescriptor, conf); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + table.getName() + " output configured."); - } - - /** - * Runs inside the task to deserialize column family to compression algorithm - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured compression algorithm - */ - @VisibleForTesting - static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration - conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - COMPRESSION_FAMILIES_CONF_KEY); - Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], - Algorithm>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue()); - compressionMap.put(e.getKey(), algorithm); - } - return compressionMap; - } - - /** - * Runs inside the task to deserialize column family to bloom filter type - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the the configured bloom filter type - */ - @VisibleForTesting - static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - BLOOM_TYPE_FAMILIES_CONF_KEY); - Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], - BloomType>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - BloomType bloomType = BloomType.valueOf(e.getValue()); - bloomTypeMap.put(e.getKey(), bloomType); - } - return bloomTypeMap; - } - - /** - * Runs inside the task to deserialize column family to block size - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured block size - */ - @VisibleForTesting - static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - BLOCK_SIZE_FAMILIES_CONF_KEY); - Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], - Integer>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Integer blockSize = Integer.parseInt(e.getValue()); - blockSizeMap.put(e.getKey(), blockSize); - } - return blockSizeMap; - } - - /** - * Runs inside the task to deserialize column family to data block encoding - * type map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to HFileDataBlockEncoder for the - * configured data block type for the family - */ - @VisibleForTesting - static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( - Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - DATABLOCK_ENCODING_FAMILIES_CONF_KEY); - Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], - DataBlockEncoding>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); - } - return encoderMap; - } - - - /** - * Run inside the task to deserialize column family to given conf value map. - * - * @param conf to read the serialized values from - * @param confName conf key to read from the configuration - * @return a map of column family to the given configuration value - */ - private static Map<byte[], String> createFamilyConfValueMap( - Configuration conf, String confName) { - Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR); - String confVal = conf.get(confName, ""); - for (String familyConf : confVal.split("&")) { - String[] familySplit = familyConf.split("="); - if (familySplit.length != 2) { - continue; - } - try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), - URLDecoder.decode(familySplit[1], "UTF-8")); - } catch (UnsupportedEncodingException e) { - // will not happen with UTF-8 encoding - throw new AssertionError(e); - } - } - return confValMap; - } - - /** - * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against - * <code>splitPoints</code>. Cleans up the partitions file after job exists. - */ - static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) - throws IOException { - Configuration conf = job.getConfiguration(); - // create the partitions file - FileSystem fs = FileSystem.get(conf); - Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + UUID.randomUUID()); - fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints); - fs.deleteOnExit(partitionsPath); - - // configure job to use it - job.setPartitionerClass(TotalOrderPartitioner.class); - TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); - } - - /** - * Serialize column family to compression algorithm map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") - @VisibleForTesting - static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) - throws UnsupportedEncodingException { - StringBuilder compressionConfigValue = new StringBuilder(); - if(tableDescriptor == null){ - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - compressionConfigValue.append('&'); - } - compressionConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - compressionConfigValue.append('='); - compressionConfigValue.append(URLEncoder.encode( - familyDescriptor.getCompression().getName(), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); - } - - /** - * Serialize column family to block size map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - StringBuilder blockSizeConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - blockSizeConfigValue.append('&'); - } - blockSizeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - blockSizeConfigValue.append('='); - blockSizeConfigValue.append(URLEncoder.encode( - String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); - } - - /** - * Serialize column family to bloom type map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder bloomTypeConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - bloomTypeConfigValue.append('&'); - } - bloomTypeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; - } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); - } - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); - } - - /** - * Serialize column family to data block encoding map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, - Configuration conf) throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - dataBlockEncodingConfigValue.append('&'); - } - dataBlockEncodingConfigValue.append( - URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - dataBlockEncodingConfigValue.append('='); - DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); - if (encoding == null) { - encoding = DataBlockEncoding.NONE; - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), - "UTF-8")); - } - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, - dataBlockEncodingConfigValue.toString()); - } -}