Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2992 [created] b837071a6


KYLIN-2992 Avoid GC in CubeHFileJob.Reducer


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b837071a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b837071a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b837071a

Branch: refs/heads/KYLIN-2992
Commit: b837071a6048433a0ec1708f358a62a8e90c2d1a
Parents: 31b16c3
Author: kangkaisen <kangkai...@meituan.com>
Authored: Fri Sep 22 13:36:23 2017 +0800
Committer: kangkaisen <kangkai...@meituan.com>
Committed: Fri Nov 3 17:33:52 2017 +0800

----------------------------------------------------------------------
 .../storage/hbase/steps/CreateHTableJob.java    |  8 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java | 26 ++++--
 .../storage/hbase/steps/CubeHFileMapper.java    | 14 ++--
 .../storage/hbase/steps/RowKeyWritable.java     | 86 ++++++++++++++++++++
 .../hbase/steps/CubeHFileMapper2Test.java       |  4 -
 .../hbase/steps/CubeHFileMapperTest.java        |  8 +-
 6 files changed, 123 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b2bab47..c41df06 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -29,7 +29,7 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -323,6 +323,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             double accumulatedSize = 0;
             int j = 0;
             for (Long cuboid : allCuboids) {
+
                 if (accumulatedSize >= hfileSizeMB) {
                     logger.info(String.format("Region %d's hfile %d size is 
%.2f mb", i, j, accumulatedSize));
                     byte[] split = new 
byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
@@ -337,10 +338,11 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         }
 
-        SequenceFile.Writer hfilePartitionWriter = 
SequenceFile.createWriter(hbaseConf, 
SequenceFile.Writer.file(hfilePartitionFile), 
SequenceFile.Writer.keyClass(ImmutableBytesWritable.class), 
SequenceFile.Writer.valueClass(NullWritable.class));
+        SequenceFile.Writer hfilePartitionWriter = 
SequenceFile.createWriter(hbaseConf, 
SequenceFile.Writer.file(hfilePartitionFile), 
SequenceFile.Writer.keyClass(RowKeyWritable.class), 
SequenceFile.Writer.valueClass(NullWritable.class));
 
         for (int i = 0; i < splits.size(); i++) {
-            hfilePartitionWriter.append(new 
ImmutableBytesWritable(splits.get(i)), NullWritable.get());
+            //when we compare the rowkey, we compare the row firstly.
+            hfilePartitionWriter.append(new 
RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
 NullWritable.get());
         }
         hfilePartitionWriter.close();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 5f51c13..093e8ee 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -25,8 +25,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -39,6 +40,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -81,10 +83,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
             FileOutputFormat.setOutputPath(job, output);
 
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(CubeHFileMapper.class);
-            job.setReducerClass(KeyValueSortReducer.class);
-
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             // add metadata to distributed cache
@@ -97,6 +95,13 @@ public class CubeHFileJob extends AbstractHadoopJob {
             HFileOutputFormat3.configureIncrementalLoad(job, htable);
             reconfigurePartitions(hbaseConf, partitionFilePath);
 
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(CubeHFileMapper.class);
+            job.setReducerClass(KeyValueReducer.class);
+            job.setMapOutputKeyClass(RowKeyWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+            job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class);
+
             // set block replication to 3 for hfiles
             hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
@@ -109,6 +114,17 @@ public class CubeHFileJob extends AbstractHadoopJob {
         }
     }
 
+    private static class KeyValueReducer extends KylinReducer<RowKeyWritable, 
KeyValue, ImmutableBytesWritable, KeyValue> {
+        private ImmutableBytesWritable immutableBytesWritable = new 
ImmutableBytesWritable();
+        @Override
+        public void doReduce(RowKeyWritable row, Iterable<KeyValue> kvs, 
Context context) throws java.io.IOException, InterruptedException {
+            for (KeyValue kv : kvs) {
+                immutableBytesWritable.set(kv.getKey());
+                context.write(immutableBytesWritable, kv);
+            }
+        }
+    }
+
     /**
      * Check if there's partition files for hfile, if yes replace the table 
splits, to make the job more reducers
      * @param conf the job configuration

http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index f3d9f69..260db8d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
@@ -41,9 +40,7 @@ import com.google.common.collect.Lists;
  * @author George Song (ysong1)
  * 
  */
-public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWritable, KeyValue> {
-
-    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+public class CubeHFileMapper extends KylinMapper<Text, Text, RowKeyWritable, 
KeyValue> {
 
     String cubeName;
     CubeDesc cubeDesc;
@@ -52,6 +49,8 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWrita
     Object[] inputMeasures;
     List<KeyValueCreator> keyValueCreators;
 
+    private RowKeyWritable rowKeyWritable = new RowKeyWritable();
+
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -75,14 +74,14 @@ public class CubeHFileMapper extends KylinMapper<Text, 
Text, ImmutableBytesWrita
 
     @Override
     public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
-        outputKey.set(key.getBytes(), 0, key.getLength());
         KeyValue outputValue;
 
         int n = keyValueCreators.size();
         if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for 
simple full copy
 
             outputValue = keyValueCreators.get(0).create(key, 
value.getBytes(), 0, value.getLength());
-            context.write(outputKey, outputValue);
+            rowKeyWritable.set(outputValue.createKeyOnly(false).getKey());
+            context.write(rowKeyWritable, outputValue);
 
         } else { // normal (complex) case that distributes measures to 
multiple HBase columns
 
@@ -90,7 +89,8 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWrita
 
             for (int i = 0; i < n; i++) {
                 outputValue = keyValueCreators.get(i).create(key, 
inputMeasures);
-                context.write(outputKey, outputValue);
+                rowKeyWritable.set(outputValue.createKeyOnly(false).getKey());
+                context.write(rowKeyWritable, outputValue);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
new file mode 100644
index 0000000..599a3f4
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
+    private byte[] data;
+    private int offset;
+    private int length;
+    private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator();
+
+    static {
+        WritableComparator.define(RowKeyWritable.class, new 
RowKeyComparator());
+    }
+
+    public RowKeyWritable() {
+        super();
+    }
+
+    public RowKeyWritable(byte[] bytes) {
+        this.data = bytes;
+        this.offset = 0;
+        this.length = bytes.length;
+    }
+
+    public void set(byte[] array) {
+        set(array, 0, array.length);
+    }
+
+    public void set(byte[] array, int offset, int length) {
+        this.data = array;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(this.length);
+        out.write(this.data, this.offset, this.length);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.length = in.readInt();
+        this.data = new byte[this.length];
+        in.readFully(this.data, 0, this.length);
+        this.offset = 0;
+    }
+
+    public int compareTo(RowKeyWritable other) {
+        return kvComparator.compare(this.data, this.offset, this.length, 
other.data, other.offset, other.length);
+    }
+
+    public static class RowKeyComparator extends WritableComparator {
+        private KeyValue.KVComparator kvComparator = new 
KeyValue.KVComparator();
+        private static final int LENGTH_BYTES = 4;
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return kvComparator.compare(b1, s1 + LENGTH_BYTES, l1 - 
LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
index b43e4a8..99b74ad 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -25,7 +25,6 @@ import java.io.File;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.kylin.common.util.Bytes;
@@ -80,11 +79,8 @@ public class CubeHFileMapper2Test extends 
LocalFileMetadataTestCase {
 
         mapper.map(key, value, context);
 
-        ImmutableBytesWritable outKey = (ImmutableBytesWritable) outKV[0];
         KeyValue outValue = (KeyValue) outKV[1];
 
-        assertTrue(Bytes.compareTo(key.getBytes(), 0, key.getLength(), 
outKey.get(), outKey.getOffset(), outKey.getLength()) == 0);
-
         assertTrue(Bytes.compareTo(value.getBytes(), 0, value.getLength(), 
outValue.getValueArray(), outValue.getValueOffset(), outValue.getValueLength()) 
== 0);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b837071a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
index f8282d3..eba4a37 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
@@ -39,7 +39,7 @@ import org.junit.Test;
  */
 public class CubeHFileMapperTest {
 
-    MapDriver<Text, Text, ImmutableBytesWritable, KeyValue> mapDriver;
+    MapDriver<Text, Text, RowKeyWritable, KeyValue> mapDriver;
 
     private String cube_name = "FLAT_ITEM_CUBE";
 
@@ -57,15 +57,15 @@ public class CubeHFileMapperTest {
 
         mapDriver.addInput(new Text("52010tech"), new Text("35.432"));
 
-        List<Pair<ImmutableBytesWritable, KeyValue>> result = mapDriver.run();
+        List<Pair<RowKeyWritable, KeyValue>> result = mapDriver.run();
 
         assertEquals(2, result.size());
 
         byte[] bytes = { 0, 0, 0, 0, 0, 0, 0, 119, 33, 0, 22, 1, 0, 121, 7 };
         ImmutableBytesWritable key = new ImmutableBytesWritable(bytes);
 
-        Pair<ImmutableBytesWritable, KeyValue> p1 = result.get(0);
-        Pair<ImmutableBytesWritable, KeyValue> p2 = result.get(1);
+        Pair<RowKeyWritable, KeyValue> p1 = result.get(0);
+        Pair<RowKeyWritable, KeyValue> p2 = result.get(1);
 
         assertEquals(key, p1.getFirst());
         assertEquals("cf1", new String(p1.getSecond().getFamily()));

Reply via email to