Repository: kylin Updated Branches: refs/heads/2.x-staging 4d4e743c7 -> 2ac2f56a1
http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 6981214..6acaa8b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.util.KryoUtils; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; @@ -132,8 +131,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { this.serviceStartTime = System.currentTimeMillis(); - GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class); - RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class); + GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java index 10e80ae..76c6637 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java @@ -34,7 +34,6 @@ import org.apache.kylin.measure.hllc.HLLCMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -214,7 +213,7 @@ public class EndpointAggregators { } return length; } - + public List<Object> deserializeMetricValues(ByteBuffer buffer) { List<Object> ret = Lists.newArrayList(); for (int i = 0; i < measureSerializers.length; i++) { @@ -226,7 +225,7 @@ public class EndpointAggregators { } public static byte[] serialize(EndpointAggregators o) { - ByteBuffer buf = ByteBuffer.allocate(CoprocessorConstants.SERIALIZE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); serializer.serialize(o, buf); byte[] result = new byte[buf.position()]; System.arraycopy(buf.array(), 0, result, 0, buf.position()); @@ -237,9 +236,7 @@ public class EndpointAggregators { return serializer.deserialize(ByteBuffer.wrap(bytes)); } - private static final Serializer serializer = new Serializer(); - - private static class Serializer implements BytesSerializer<EndpointAggregators> { + private static final BytesSerializer<EndpointAggregators> serializer = new BytesSerializer<EndpointAggregators>() { @Override public void serialize(EndpointAggregators value, ByteBuffer out) { @@ -278,7 +275,7 @@ public class EndpointAggregators { return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo); } - } + }; public int getMeasureSerializeLength() { int length = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java index 0fe9898..af7b993 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.Dictionary; @@ -50,7 +51,6 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; @@ -65,7 +65,6 @@ import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; - import it.uniroma3.mat.extendedset.intset.ConciseSet; /** @@ -206,7 +205,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary); byte[] recordBuffer = new byte[recordInfo.getByteFormLen()]; - byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE]; + byte[] buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE]; int iteratedSliceCount = 0; long latestSliceTs = Long.MIN_VALUE; @@ -274,7 +273,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop AggrKey aggrKey = entry.getKey(); IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length())); if (offset + measureLength > buffer.length) { - buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE]; + buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE]; offset = 0; } int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);