Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-942 9cf1d1be8 -> eaef279c7


fix bug in hbase cuboid writer


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/eaef279c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/eaef279c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/eaef279c

Branch: refs/heads/KYLIN-942
Commit: eaef279c71bb7394b22b3a4084766008d298aec2
Parents: 9cf1d1b
Author: honma <ho...@ebay.com>
Authored: Thu Sep 24 10:56:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Sep 24 10:56:11 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/kv/RowConstants.java  |  1 +
 .../mr/steps/MapContextGTRecordWriter.java      |  6 ++---
 .../kylin/job/streaming/CubeStreamConsumer.java |  5 ++--
 .../storage/hbase/steps/HBaseCuboidWriter.java  | 26 ++++++++++++++++----
 .../hbase/steps/HBaseStreamingOutput.java       |  2 +-
 5 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index af4adbb..c5adfb5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -43,6 +43,7 @@ public class RowConstants {
     public static final String ROWVALUE_DELIMITER_STRING = 
String.valueOf((char) 7);
     public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
 
+    public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB
     public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
 
     // marker class

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 58e820f..402bec0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -61,8 +61,8 @@ public class MapContextGTRecordWriter implements 
ICuboidWriter {
         }
 
         cuboidRowCount++;
-        int preamble = RowConstants.ROWKEY_HEADER_LEN;
-        int offSet = preamble;
+        int header = RowConstants.ROWKEY_HEADER_LEN;
+        int offSet = header;
         for (int x = 0; x < dimensions; x++) {
             System.arraycopy(record.get(x).array(), record.get(x).offset(), 
keyBuf, offSet, record.get(x).length());
             offSet += record.get(x).length();
@@ -70,7 +70,7 @@ public class MapContextGTRecordWriter implements 
ICuboidWriter {
 
         //fill shard
         short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
-        short shardOffset = ShardingHash.getShard(keyBuf, preamble, offSet - 
preamble, cuboidShardNum);
+        short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - 
header, cuboidShardNum);
         Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
         short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, 
cubeSegment.getTotalShards());
         BytesUtil.writeShort(finalShard, keyBuf, 0, 
RowConstants.ROWKEY_SHARDID_LEN);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java 
b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 09a9826..0fbe975 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -45,12 +45,11 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
-import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
 import org.apache.kylin.storage.hbase.steps.HBaseConnection;
 import org.apache.kylin.storage.hbase.steps.HBaseCuboidWriter;
-import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
 import org.apache.kylin.streaming.MicroStreamBatch;
 import org.apache.kylin.streaming.MicroStreamBatchConsumer;
 import org.slf4j.Logger;
@@ -108,7 +107,7 @@ public class CubeStreamConsumer implements 
MicroStreamBatchConsumer {
 
         InMemCubeBuilder inMemCubeBuilder = new 
InMemCubeBuilder(cubeInstance.getDescriptor(), realDictMap);
         final HTableInterface hTable = createHTable(cubeSegment);
-        final HBaseCuboidWriter gtRecordWriter = new 
HBaseCuboidWriter(cubeDesc, hTable);
+        final HBaseCuboidWriter gtRecordWriter = new 
HBaseCuboidWriter(cubeSegment, hTable);
 
         executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, 
gtRecordWriter)).get();
         gtRecordWriter.flush();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index 1271070..bc0abc0 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -43,9 +43,13 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -68,12 +72,14 @@ public final class HBaseCuboidWriter implements 
ICuboidWriter {
     private final HTableInterface hTable;
     private final ByteBuffer byteBuffer;
     private final CubeDesc cubeDesc;
+    private final CubeSegment cubeSegment;
     private final Object[] measureValues;
     private List<Put> puts = Lists.newArrayList();
 
-    public HBaseCuboidWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+    public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) {
         this.keyValueCreators = Lists.newArrayList();
-        this.cubeDesc = cubeDesc;
+        this.cubeSegment = segment;
+        this.cubeDesc = cubeSegment.getCubeDesc();
         for (HBaseColumnFamilyDesc cfDesc : 
cubeDesc.getHBaseMapping().getColumnFamily()) {
             for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
                 keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
@@ -81,7 +87,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter 
{
         }
         this.nColumns = keyValueCreators.size();
         this.hTable = hTable;
-        this.byteBuffer = ByteBuffer.allocate(1 << 20);
+        this.byteBuffer = ByteBuffer.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
         this.measureValues = new Object[cubeDesc.getMeasures().size()];
     }
 
@@ -93,12 +99,22 @@ public final class HBaseCuboidWriter implements 
ICuboidWriter {
 
     private ByteBuffer createKey(Long cuboidId, GTRecord record) {
         byteBuffer.clear();
-        byteBuffer.put(Bytes.toBytes(cuboidId));
+        byteBuffer.put(Bytes.toBytes((short) 0), 0, 
RowConstants.ROWKEY_SHARDID_LEN);//occupy space first
+        byteBuffer.put(Bytes.toBytes(cuboidId), 0, 
RowConstants.ROWKEY_CUBOIDID_LEN);
         final int cardinality = BitSet.valueOf(new long[] { cuboidId 
}).cardinality();
         for (int i = 0; i < cardinality; i++) {
             final ByteArray byteArray = record.get(i);
             byteBuffer.put(byteArray.array(), byteArray.offset(), 
byteArray.length());
         }
+
+        //fill shard
+        short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
+        short shardOffset = ShardingHash.getShard(byteBuffer.array(), //
+                RowConstants.ROWKEY_HEADER_LEN, byteBuffer.position() - 
RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
+        Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+        short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, 
cubeSegment.getTotalShards());
+        BytesUtil.writeShort(finalShard, byteBuffer.array(), 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+
         return byteBuffer;
     }
 
@@ -108,7 +124,7 @@ public final class HBaseCuboidWriter implements 
ICuboidWriter {
         final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
         final int nDims = cuboid.getColumns().size();
         final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + 
cubeDesc.getMeasures().size());
-        
+
         for (int i = 0; i < nColumns; i++) {
             final Object[] values = record.getValues(bitSet, measureValues);
             final KeyValue keyValue = 
keyValueCreators.get(i).create(key.array(), 0, key.position(), values);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index b81d35e..e4617b7 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -52,7 +52,7 @@ public class HBaseStreamingOutput implements IStreamingOutput 
{
             CubeSegment cubeSegment = (CubeSegment) buildable;
             final HTableInterface hTable;
             hTable = createHTable(cubeSegment);
-            return new HBaseCuboidWriter(cubeSegment.getCubeDesc(), hTable);
+            return new HBaseCuboidWriter(cubeSegment, hTable);
         } catch (IOException e) {
             throw new RuntimeException("failed to get ICuboidWriter", e);
         }

Reply via email to