Repository: kylin Updated Branches: refs/heads/KYLIN-2992 [created] b837071a6
KYLIN-2992 Avoid GC in CubeHFileJob.Reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b837071a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b837071a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b837071a Branch: refs/heads/KYLIN-2992 Commit: b837071a6048433a0ec1708f358a62a8e90c2d1a Parents: 31b16c3 Author: kangkaisen <kangkai...@meituan.com> Authored: Fri Sep 22 13:36:23 2017 +0800 Committer: kangkaisen <kangkai...@meituan.com> Committed: Fri Nov 3 17:33:52 2017 +0800 ---------------------------------------------------------------------- .../storage/hbase/steps/CreateHTableJob.java | 8 +- .../kylin/storage/hbase/steps/CubeHFileJob.java | 26 ++++-- .../storage/hbase/steps/CubeHFileMapper.java | 14 ++-- .../storage/hbase/steps/RowKeyWritable.java | 86 ++++++++++++++++++++ .../hbase/steps/CubeHFileMapper2Test.java | 4 - .../hbase/steps/CubeHFileMapperTest.java | 8 +- 6 files changed, 123 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index b2bab47..c41df06 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -29,7 +29,7 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; @@ -323,6 +323,7 @@ public class CreateHTableJob extends AbstractHadoopJob { double accumulatedSize = 0; int j = 0; for (Long cuboid : allCuboids) { + if (accumulatedSize >= hfileSizeMB) { logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize)); byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN]; @@ -337,10 +338,11 @@ public class CreateHTableJob extends AbstractHadoopJob { } - SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(ImmutableBytesWritable.class), SequenceFile.Writer.valueClass(NullWritable.class)); + SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class)); for (int i = 0; i < splits.size(); i++) { - hfilePartitionWriter.append(new ImmutableBytesWritable(splits.get(i)), NullWritable.get()); + //when we compare the rowkey, we compare the row firstly. + hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), NullWritable.get()); } hfilePartitionWriter.close(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 5f51c13..093e8ee 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -25,8 +25,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -39,6 +40,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -81,10 +83,6 @@ public class CubeHFileJob extends AbstractHadoopJob { addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(CubeHFileMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); // add metadata to distributed cache @@ -97,6 +95,13 @@ public class CubeHFileJob extends AbstractHadoopJob { HFileOutputFormat3.configureIncrementalLoad(job, htable); reconfigurePartitions(hbaseConf, partitionFilePath); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(CubeHFileMapper.class); + job.setReducerClass(KeyValueReducer.class); + job.setMapOutputKeyClass(RowKeyWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class); + // set block replication to 3 for hfiles hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -109,6 +114,17 @@ public class CubeHFileJob extends AbstractHadoopJob { } } + private static class KeyValueReducer extends KylinReducer<RowKeyWritable, KeyValue, ImmutableBytesWritable, KeyValue> { + private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); + @Override + public void doReduce(RowKeyWritable row, Iterable<KeyValue> kvs, Context context) throws java.io.IOException, InterruptedException { + for (KeyValue kv : kvs) { + immutableBytesWritable.set(kv.getKey()); + context.write(immutableBytesWritable, kv); + } + } + } + /** * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers * @param conf the job configuration http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java index f3d9f69..260db8d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; @@ -41,9 +40,7 @@ import com.google.common.collect.Lists; * @author George Song (ysong1) * */ -public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWritable, KeyValue> { - - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); +public class CubeHFileMapper extends KylinMapper<Text, Text, RowKeyWritable, KeyValue> { String cubeName; CubeDesc cubeDesc; @@ -52,6 +49,8 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita Object[] inputMeasures; List<KeyValueCreator> keyValueCreators; + private RowKeyWritable rowKeyWritable = new RowKeyWritable(); + @Override protected void doSetup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); @@ -75,14 +74,14 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { - outputKey.set(key.getBytes(), 0, key.getLength()); KeyValue outputValue; int n = keyValueCreators.size(); if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength()); - context.write(outputKey, outputValue); + rowKeyWritable.set(outputValue.createKeyOnly(false).getKey()); + context.write(rowKeyWritable, outputValue); } else { // normal (complex) case that distributes measures to multiple HBase columns @@ -90,7 +89,8 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita for (int i = 0; i < n; i++) { outputValue = keyValueCreators.get(i).create(key, inputMeasures); - context.write(outputKey, outputValue); + rowKeyWritable.set(outputValue.createKeyOnly(false).getKey()); + context.write(rowKeyWritable, outputValue); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java new file mode 100644 index 0000000..599a3f4 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class RowKeyWritable implements WritableComparable<RowKeyWritable> { + private byte[] data; + private int offset; + private int length; + private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator(); + + static { + WritableComparator.define(RowKeyWritable.class, new RowKeyComparator()); + } + + public RowKeyWritable() { + super(); + } + + public RowKeyWritable(byte[] bytes) { + this.data = bytes; + this.offset = 0; + this.length = bytes.length; + } + + public void set(byte[] array) { + set(array, 0, array.length); + } + + public void set(byte[] array, int offset, int length) { + this.data = array; + this.offset = offset; + this.length = length; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.length); + out.write(this.data, this.offset, this.length); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.length = in.readInt(); + this.data = new byte[this.length]; + in.readFully(this.data, 0, this.length); + this.offset = 0; + } + + public int compareTo(RowKeyWritable other) { + return kvComparator.compare(this.data, this.offset, this.length, other.data, other.offset, other.length); + } + + public static class RowKeyComparator extends WritableComparator { + private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator(); + private static final int LENGTH_BYTES = 4; + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return kvComparator.compare(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java index b43e4a8..99b74ad 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java @@ -25,7 +25,6 @@ import java.io.File; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.kylin.common.util.Bytes; @@ -80,11 +79,8 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase { mapper.map(key, value, context); - ImmutableBytesWritable outKey = (ImmutableBytesWritable) outKV[0]; KeyValue outValue = (KeyValue) outKV[1]; - assertTrue(Bytes.compareTo(key.getBytes(), 0, key.getLength(), outKey.get(), outKey.getOffset(), outKey.getLength()) == 0); - assertTrue(Bytes.compareTo(value.getBytes(), 0, value.getLength(), outValue.getValueArray(), outValue.getValueOffset(), outValue.getValueLength()) == 0); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java index f8282d3..eba4a37 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java @@ -39,7 +39,7 @@ import org.junit.Test; */ public class CubeHFileMapperTest { - MapDriver<Text, Text, ImmutableBytesWritable, KeyValue> mapDriver; + MapDriver<Text, Text, RowKeyWritable, KeyValue> mapDriver; private String cube_name = "FLAT_ITEM_CUBE"; @@ -57,15 +57,15 @@ public class CubeHFileMapperTest { mapDriver.addInput(new Text("52010tech"), new Text("35.432")); - List<Pair<ImmutableBytesWritable, KeyValue>> result = mapDriver.run(); + List<Pair<RowKeyWritable, KeyValue>> result = mapDriver.run(); assertEquals(2, result.size()); byte[] bytes = { 0, 0, 0, 0, 0, 0, 0, 119, 33, 0, 22, 1, 0, 121, 7 }; ImmutableBytesWritable key = new ImmutableBytesWritable(bytes); - Pair<ImmutableBytesWritable, KeyValue> p1 = result.get(0); - Pair<ImmutableBytesWritable, KeyValue> p2 = result.get(1); + Pair<RowKeyWritable, KeyValue> p1 = result.get(0); + Pair<RowKeyWritable, KeyValue> p2 = result.get(1); assertEquals(key, p1.getFirst()); assertEquals("cf1", new String(p1.getSecond().getFamily()));