This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 10429e4  KYLIN-4184 Real time OLAP query gets wrong result
10429e4 is described below

commit 10429e4cf0966e4dd46e9dcad4e4298a3fc6cc30
Author: wangxiaojing <wangxiaoj...@didichuxing.com>
AuthorDate: Mon Sep 30 15:46:37 2019 +0800

    KYLIN-4184 Real time OLAP query gets wrong result
---
 .../storage/columnar/FragmentSearchResult.java     | 11 ++++-----
 .../stream/core/storage/columnar/RawRecord.java    | 26 ++++++++++++++++++----
 .../compress/RunLengthCompressedColumnReader.java  |  6 +++--
 3 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java
index 920926e..ebf3c8a 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentSearchResult.java
@@ -344,10 +344,6 @@ public class FragmentSearchResult implements 
IStreamingSearchResult {
 
             public boolean aggregate(RawRecord r) {
                 byte[][] dimVals = r.getDimensions();
-//            for (int i = 0; i < dimVals.length; i++) {
-//                copyDimVals[i] = new byte[dimVals[i].length];
-//                System.arraycopy(dimVals[i], 0, copyDimVals[i], 0, 
dimVals[i].length);
-//            }
                 byte[][] metricsVals = r.getMetrics();
                 MeasureAggregator[] aggrs = aggBufMap.get(dimVals);
                 if (aggrs == null) {
@@ -356,7 +352,12 @@ public class FragmentSearchResult implements 
IStreamingSearchResult {
                         return false;
                     }
                     byte[][] copyDimVals = new 
byte[schema.getDimensionCount()][];
-                    System.arraycopy(dimVals, 0, copyDimVals, 0, 
dimVals.length);
+
+                    for(int i=0;i<dimVals.length;i++){
+                        copyDimVals[i] = new byte[dimVals[i].length];
+                        System.arraycopy(dimVals[i], 0, copyDimVals[i], 0, 
dimVals[i].length);
+                    }
+
                     aggrs = newAggregators();
                     aggBufMap.put(copyDimVals, aggrs);
                 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java
index 952419a..1801d95 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/RawRecord.java
@@ -38,14 +38,32 @@ public class RawRecord {
         if (another.getDimensions().length != dimensions.length || 
another.getMetrics().length != metrics.length) {
             throw new IllegalStateException("cannot copy record with different 
schema");
         }
-        System.arraycopy(another.dimensions, 0, this.dimensions, 0, 
another.dimensions.length);
-        System.arraycopy(another.metrics, 0, this.metrics, 0, 
another.metrics.length);
+
+        for(int i=0;i<another.dimensions.length;i++){
+            this.dimensions[i] = new byte[another.dimensions[i].length];
+            System.arraycopy(another.dimensions[i], 0, this.dimensions[i], 0, 
another.dimensions[i].length);
+        }
+
+        for(int i=0;i<another.metrics.length;i++){
+            this.metrics[i]=new byte[another.metrics[i].length];
+            System.arraycopy(another.metrics[i], 0, this.metrics[i], 0, 
another.metrics[i].length);
+        }
+
     }
 
     public RawRecord clone() {
         RawRecord rawRecord = new RawRecord(dimensions.length, metrics.length);
-        System.arraycopy(dimensions, 0, rawRecord.dimensions, 0, 
dimensions.length);
-        System.arraycopy(metrics, 0, rawRecord.metrics, 0, metrics.length);
+
+        for(int i=0;i<dimensions.length;i++){
+            rawRecord.dimensions[i]=new byte[dimensions[i].length];
+            System.arraycopy(dimensions[i], 0, rawRecord.dimensions[i], 0, 
dimensions[i].length);
+        }
+
+        for(int i=0;i<metrics.length;i++){
+            rawRecord.metrics[i]=new byte[metrics[i].length];
+            System.arraycopy(metrics[i], 0, rawRecord.metrics[i], 0, 
metrics[i].length);
+        }
+
         return rawRecord;
     }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
index a4aee1d..5346a72 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
@@ -31,7 +31,7 @@ public class RunLengthCompressedColumnReader implements 
ColumnDataReader {
     private ByteBuffer currBlockBuffer;
     private int currBlockNum;
 
-    private byte[] readBuffer;
+    //private byte[] readBuffer;
     private int rowCount;
     private GeneralColumnDataReader blockDataReader;
 
@@ -45,7 +45,7 @@ public class RunLengthCompressedColumnReader implements 
ColumnDataReader {
 
         this.blockDataReader = new GeneralColumnDataReader(dataBuffer, 
columnDataStartOffset, columnDataLength - 8);
         this.currBlockNum = -1;
-        this.readBuffer = new byte[valLen];
+        //this.readBuffer = new byte[valLen];
     }
 
     private void loadBuffer(int targetBlockNum) {
@@ -64,6 +64,8 @@ public class RunLengthCompressedColumnReader implements 
ColumnDataReader {
 
     @Override
     public byte[] read(int rowNum) {
+        byte[] readBuffer = new byte[valLen];
+
         int targetBlockNum = rowNum / numValInBlock;
         if (targetBlockNum != currBlockNum) {
             loadBuffer(targetBlockNum);

Reply via email to