This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 10429e4 KYLIN-4184 Real time OLAP query gets wrong result 10429e4 is described below commit 10429e4cf0966e4dd46e9dcad4e4298a3fc6cc30 Author: wangxiaojing <wangxiaoj...@didichuxing.com> AuthorDate: Mon Sep 30 15:46:37 2019 +0800 KYLIN-4184 Real time OLAP query gets wrong result --- .../storage/columnar/FragmentSearchResult.java | 11 ++++----- .../stream/core/storage/columnar/RawRecord.java | 26 ++++++++++++++++++---- .../compress/RunLengthCompressedColumnReader.java | 6 +++-- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java index 920926e..ebf3c8a 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java @@ -344,10 +344,6 @@ public class FragmentSearchResult implements IStreamingSearchResult { public boolean aggregate(RawRecord r) { byte[][] dimVals = r.getDimensions(); -// for (int i = 0; i < dimVals.length; i++) { -// copyDimVals[i] = new byte[dimVals[i].length]; -// System.arraycopy(dimVals[i], 0, copyDimVals[i], 0, dimVals[i].length); -// } byte[][] metricsVals = r.getMetrics(); MeasureAggregator[] aggrs = aggBufMap.get(dimVals); if (aggrs == null) { @@ -356,7 +352,12 @@ public class FragmentSearchResult implements IStreamingSearchResult { return false; } byte[][] copyDimVals = new byte[schema.getDimensionCount()][]; - System.arraycopy(dimVals, 0, copyDimVals, 0, dimVals.length); + + for(int i=0;i<dimVals.length;i++){ + copyDimVals[i] = new byte[dimVals[i].length]; + System.arraycopy(dimVals[i], 0, copyDimVals[i], 0, dimVals[i].length); + } + aggrs = newAggregators(); aggBufMap.put(copyDimVals, aggrs); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java index 952419a..1801d95 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java @@ -38,14 +38,32 @@ public class RawRecord { if (another.getDimensions().length != dimensions.length || another.getMetrics().length != metrics.length) { throw new IllegalStateException("cannot copy record with different schema"); } - System.arraycopy(another.dimensions, 0, this.dimensions, 0, another.dimensions.length); - System.arraycopy(another.metrics, 0, this.metrics, 0, another.metrics.length); + + for(int i=0;i<another.dimensions.length;i++){ + this.dimensions[i] = new byte[another.dimensions[i].length]; + System.arraycopy(another.dimensions[i], 0, this.dimensions[i], 0, another.dimensions[i].length); + } + + for(int i=0;i<another.metrics.length;i++){ + this.metrics[i]=new byte[another.metrics[i].length]; + System.arraycopy(another.metrics[i], 0, this.metrics[i], 0, another.metrics[i].length); + } + } public RawRecord clone() { RawRecord rawRecord = new RawRecord(dimensions.length, metrics.length); - System.arraycopy(dimensions, 0, rawRecord.dimensions, 0, dimensions.length); - System.arraycopy(metrics, 0, rawRecord.metrics, 0, metrics.length); + + for(int i=0;i<dimensions.length;i++){ + rawRecord.dimensions[i]=new byte[dimensions[i].length]; + System.arraycopy(dimensions[i], 0, rawRecord.dimensions[i], 0, dimensions[i].length); + } + + for(int i=0;i<metrics.length;i++){ + rawRecord.metrics[i]=new byte[metrics[i].length]; + System.arraycopy(metrics[i], 0, rawRecord.metrics[i], 0, metrics[i].length); + } + return rawRecord; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java index a4aee1d..5346a72 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java @@ -31,7 +31,7 @@ public class RunLengthCompressedColumnReader implements ColumnDataReader { private ByteBuffer currBlockBuffer; private int currBlockNum; - private byte[] readBuffer; + //private byte[] readBuffer; private int rowCount; private GeneralColumnDataReader blockDataReader; @@ -45,7 +45,7 @@ public class RunLengthCompressedColumnReader implements ColumnDataReader { this.blockDataReader = new GeneralColumnDataReader(dataBuffer, columnDataStartOffset, columnDataLength - 8); this.currBlockNum = -1; - this.readBuffer = new byte[valLen]; + //this.readBuffer = new byte[valLen]; } private void loadBuffer(int targetBlockNum) { @@ -64,6 +64,8 @@ public class RunLengthCompressedColumnReader implements ColumnDataReader { @Override public byte[] read(int rowNum) { + byte[] readBuffer = new byte[valLen]; + int targetBlockNum = rowNum / numValInBlock; if (targetBlockNum != currBlockNum) { loadBuffer(targetBlockNum);