Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-942 9cf1d1be8 -> eaef279c7
fix bug in hbase cuboid writer Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/eaef279c Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/eaef279c Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/eaef279c Branch: refs/heads/KYLIN-942 Commit: eaef279c71bb7394b22b3a4084766008d298aec2 Parents: 9cf1d1b Author: honma <ho...@ebay.com> Authored: Thu Sep 24 10:56:11 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Thu Sep 24 10:56:11 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/kv/RowConstants.java | 1 + .../mr/steps/MapContextGTRecordWriter.java | 6 ++--- .../kylin/job/streaming/CubeStreamConsumer.java | 5 ++-- .../storage/hbase/steps/HBaseCuboidWriter.java | 26 ++++++++++++++++---- .../hbase/steps/HBaseStreamingOutput.java | 2 +- 5 files changed, 28 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java index af4adbb..c5adfb5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java @@ -43,6 +43,7 @@ public class RowConstants { public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7); public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 }; + public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB // marker class http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 58e820f..402bec0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -61,8 +61,8 @@ public class MapContextGTRecordWriter implements ICuboidWriter { } cuboidRowCount++; - int preamble = RowConstants.ROWKEY_HEADER_LEN; - int offSet = preamble; + int header = RowConstants.ROWKEY_HEADER_LEN; + int offSet = header; for (int x = 0; x < dimensions; x++) { System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length()); offSet += record.get(x).length(); @@ -70,7 +70,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter { //fill shard short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId); - short shardOffset = ShardingHash.getShard(keyBuf, preamble, offSet - preamble, cuboidShardNum); + short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum); Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java index 09a9826..0fbe975 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java @@ -45,12 +45,11 @@ import org.apache.kylin.dict.Dictionary; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CuboidStatsUtil; -import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; import org.apache.kylin.storage.hbase.steps.HBaseConnection; import org.apache.kylin.storage.hbase.steps.HBaseCuboidWriter; -import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; import org.apache.kylin.streaming.MicroStreamBatch; import org.apache.kylin.streaming.MicroStreamBatchConsumer; import org.slf4j.Logger; @@ -108,7 +107,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer { InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), realDictMap); final HTableInterface hTable = createHTable(cubeSegment); - final HBaseCuboidWriter gtRecordWriter = new HBaseCuboidWriter(cubeDesc, hTable); + final HBaseCuboidWriter gtRecordWriter = new HBaseCuboidWriter(cubeSegment, hTable); executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get(); gtRecordWriter.flush(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index 1271070..bc0abc0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -43,9 +43,13 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.ShardingHash; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; @@ -68,12 +72,14 @@ public final class HBaseCuboidWriter implements ICuboidWriter { private final HTableInterface hTable; private final ByteBuffer byteBuffer; private final CubeDesc cubeDesc; + private final CubeSegment cubeSegment; private final Object[] measureValues; private List<Put> puts = Lists.newArrayList(); - public HBaseCuboidWriter(CubeDesc cubeDesc, HTableInterface hTable) { + public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) { this.keyValueCreators = Lists.newArrayList(); - this.cubeDesc = cubeDesc; + this.cubeSegment = segment; + this.cubeDesc = cubeSegment.getCubeDesc(); for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) { for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc)); @@ -81,7 +87,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter { } this.nColumns = keyValueCreators.size(); this.hTable = hTable; - this.byteBuffer = ByteBuffer.allocate(1 << 20); + this.byteBuffer = ByteBuffer.allocate(RowConstants.ROWKEY_BUFFER_SIZE); this.measureValues = new Object[cubeDesc.getMeasures().size()]; } @@ -93,12 +99,22 @@ public final class HBaseCuboidWriter implements ICuboidWriter { private ByteBuffer createKey(Long cuboidId, GTRecord record) { byteBuffer.clear(); - byteBuffer.put(Bytes.toBytes(cuboidId)); + byteBuffer.put(Bytes.toBytes((short) 0), 0, RowConstants.ROWKEY_SHARDID_LEN);//occupy space first + byteBuffer.put(Bytes.toBytes(cuboidId), 0, RowConstants.ROWKEY_CUBOIDID_LEN); final int cardinality = BitSet.valueOf(new long[] { cuboidId }).cardinality(); for (int i = 0; i < cardinality; i++) { final ByteArray byteArray = record.get(i); byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length()); } + + //fill shard + short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId); + short shardOffset = ShardingHash.getShard(byteBuffer.array(), // + RowConstants.ROWKEY_HEADER_LEN, byteBuffer.position() - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum); + Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); + short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); + BytesUtil.writeShort(finalShard, byteBuffer.array(), 0, RowConstants.ROWKEY_SHARDID_LEN); + return byteBuffer; } @@ -108,7 +124,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter { final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); final int nDims = cuboid.getColumns().size(); final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size()); - + for (int i = 0; i < nColumns; i++) { final Object[] values = record.getValues(bitSet, measureValues); final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), values); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/eaef279c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index b81d35e..e4617b7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -52,7 +52,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { CubeSegment cubeSegment = (CubeSegment) buildable; final HTableInterface hTable; hTable = createHTable(cubeSegment); - return new HBaseCuboidWriter(cubeSegment.getCubeDesc(), hTable); + return new HBaseCuboidWriter(cubeSegment, hTable); } catch (IOException e) { throw new RuntimeException("failed to get ICuboidWriter", e); }