Supported IUD for vector reader

Fixed commets


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/64f973e8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/64f973e8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/64f973e8

Branch: refs/heads/branch-1.1
Commit: 64f973e86b730e6454ef9b6d8a1e50dd6e8a85e5
Parents: ef583af
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Wed May 31 20:54:49 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 13:15:42 2017 +0530

----------------------------------------------------------------------
 .../DictionaryBasedVectorResultCollector.java   |   5 +-
 .../core/scan/result/AbstractScannedResult.java |  25 +++-
 .../scan/result/vector/CarbonColumnVector.java  |   9 ++
 .../scan/result/vector/CarbonColumnarBatch.java |  33 ++++-
 .../dataload/TestBatchSortDataLoad.scala        |  20 +--
 .../iud/UpdateCarbonTableTestCase.scala         |   2 +-
 .../vectorreader/ColumnarVectorWrapper.java     | 130 ++++++++++++++++---
 .../VectorizedCarbonRecordReader.java           |   5 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |   8 +-
 .../spark/sql/hive/CarbonSessionState.scala     |   3 +-
 10 files changed, 198 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 91afe77..7a8fe06 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -144,6 +144,8 @@ public class DictionaryBasedVectorResultCollector extends 
AbstractScannedResultC
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+      scannedResult.markFilteredRows(
+          columnarBatch, rowCounter, requiredRows, 
columnarBatch.getRowCounter());
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, 
availableRows, requiredRows);
     }
   }
@@ -162,7 +164,8 @@ public class DictionaryBasedVectorResultCollector extends 
AbstractScannedResultC
       // Or set the row counter.
       scannedResult.setRowCounter(rowCounter + requiredRows);
     }
-    columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+    columnarBatch.setActualSize(
+        columnarBatch.getActualSize() + requiredRows - 
columnarBatch.getRowsFilteredCount());
     columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index e57a290..a1074ea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -34,6 +34,7 @@ import 
org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -283,7 +284,8 @@ public abstract class AbstractScannedResult {
         String data = getBlockletId();
         if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
             .equals(columnVectorInfo.dimension.getColumnName())) {
-          data = data + CarbonCommonConstants.FILE_SEPARATOR + j;
+          data = data + CarbonCommonConstants.FILE_SEPARATOR +
+              (rowMapping == null ? j : rowMapping[pageCounter][j]);
         }
         vector.putBytes(vectorOffset++, offset, data.length(), 
data.getBytes());
       }
@@ -638,4 +640,25 @@ public abstract class AbstractScannedResult {
       BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) {
     this.blockletDeleteDeltaCache = blockletDeleteDeltaCache;
   }
+
+  /**
+   * Mark the filtered rows in columnar batch. These rows will not be added to 
vector batches later.
+   * @param columnarBatch
+   * @param startRow
+   * @param size
+   * @param vectorOffset
+   */
+  public void markFilteredRows(CarbonColumnarBatch columnarBatch, int 
startRow, int size,
+      int vectorOffset) {
+    if (blockletDeleteDeltaCache != null) {
+      int len = startRow + size;
+      for (int i = startRow; i < len; i++) {
+        int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
+        if (blockletDeleteDeltaCache.contains(rowId)) {
+          columnarBatch.markFiltered(vectorOffset);
+        }
+        vectorOffset++;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 4952e07..a3eb48b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -17,10 +17,15 @@
 
 package org.apache.carbondata.core.scan.result.vector;
 
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 public interface CarbonColumnVector {
 
+  void putBoolean(int rowId, boolean value);
+
+  void putFloat(int rowId, float value);
+
   void putShort(int rowId, short value);
 
   void putShorts(int rowId, int count, short value);
@@ -59,4 +64,8 @@ public interface CarbonColumnVector {
 
   void reset();
 
+  DataType getType();
+
+  void setFilteredRowsExist(boolean filteredRowsExist);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index faeffde..cfc2f16 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.scan.result.vector;
 
+import java.util.Arrays;
+
 public class CarbonColumnarBatch {
 
   public CarbonColumnVector[] columnVectors;
@@ -27,9 +29,15 @@ public class CarbonColumnarBatch {
 
   private int rowCounter;
 
-  public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int 
batchSize) {
+  private boolean[] filteredRows;
+
+  private int rowsFiltered;
+
+  public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize,
+      boolean[] filteredRows) {
     this.columnVectors = columnVectors;
     this.batchSize = batchSize;
+    this.filteredRows = filteredRows;
   }
 
   public int getBatchSize() {
@@ -47,6 +55,8 @@ public class CarbonColumnarBatch {
   public void reset() {
     actualSize = 0;
     rowCounter = 0;
+    rowsFiltered = 0;
+    Arrays.fill(filteredRows, false);
     for (int i = 0; i < columnVectors.length; i++) {
       columnVectors[i].reset();
     }
@@ -59,4 +69,25 @@ public class CarbonColumnarBatch {
   public void setRowCounter(int rowCounter) {
     this.rowCounter = rowCounter;
   }
+
+  /**
+   * Mark the rows as filterd first before filling the batch, so that these 
rows will not be added
+   * to vector batches.
+   * @param rowId
+   */
+  public void markFiltered(int rowId) {
+    if (!filteredRows[rowId]) {
+      filteredRows[rowId] = true;
+      rowsFiltered++;
+    }
+    if (rowsFiltered == 1) {
+      for (int i = 0; i < columnVectors.length; i++) {
+        columnVectors[i].setFilteredRowsExist(true);
+      }
+    }
+  }
+
+  public int getRowsFilteredCount() {
+    return rowsFiltered;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 43bcac8..d53b5e5 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -36,7 +36,7 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
     val writer = new BufferedWriter(new FileWriter(file))
     writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10")
     writer.newLine()
-    for(i <- 0 until 200000) {
+    for(i <- 0 until 100000) {
       writer.write("a" + i%1000 + "," +
                    "b" + i%1000 + "," +
                    "c" + i%1000 + "," +
@@ -84,9 +84,9 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
         s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
 
-    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000)))
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(100000)))
 
-    assert(getIndexfileCount("carbon_load1") == 10, "Something wrong in batch 
sort")
+    assert(getIndexfileCount("carbon_load1") == 5, "Something wrong in batch 
sort")
   }
 
   test("test batch sort load by passing option to load command and compare 
with normal load") {
@@ -115,7 +115,7 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
         s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
     sql("alter table carbon_load1 compact 'major'")
     Thread.sleep(4000)
-    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000)))
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(400000)))
 
     assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in 
compaction after batch sort")
 
@@ -137,7 +137,7 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
         s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
     sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
 
-    checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000)))
+    checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(400000)))
 
     checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
       sql("select * from carbon_load5 where c1='a1' order by c1"))
@@ -165,9 +165,9 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " +
         s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 
'single_pass'='true')")
 
-    checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000)))
+    checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(100000)))
 
-    assert(getIndexfileCount("carbon_load3") == 10, "Something wrong in batch 
sort")
+    assert(getIndexfileCount("carbon_load3") == 5, "Something wrong in batch 
sort")
 
     checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"),
       sql("select * from carbon_load2 where c1='a1' order by c1"))
@@ -186,9 +186,9 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
 
     sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " )
 
-    checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000)))
+    checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(100000)))
 
-    assert(getIndexfileCount("carbon_load4") == 10, "Something wrong in batch 
sort")
+    assert(getIndexfileCount("carbon_load4") == 5, "Something wrong in batch 
sort")
     CarbonProperties.getInstance().
       addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
         CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
@@ -206,7 +206,7 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
 
     sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " )
 
-    checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000)))
+    checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(100000)))
 
     assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch 
sort")
     CarbonProperties.getInstance().

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 25fe91b..7917b61 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -42,7 +42,7 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , 
"true")
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
+      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index f94c0b2..c3d2a87 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -20,78 +20,165 @@ package org.apache.carbondata.spark.vectorreader;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private ColumnVector columnVector;
 
-  public ColumnarVectorWrapper(ColumnVector columnVector) {
+  private boolean[] filteredRows;
+
+  private int counter;
+
+  private boolean filteredRowsExist;
+
+  public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] 
filteredRows) {
     this.columnVector = columnVector;
+    this.filteredRows = filteredRows;
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBoolean(counter++, value);
+    }
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putFloat(counter++, value);
+    }
   }
 
   @Override public void putShort(int rowId, short value) {
-    columnVector.putShort(rowId, value);
+    if (!filteredRows[rowId]) {
+      columnVector.putShort(counter++, value);
+    }
   }
 
   @Override public void putShorts(int rowId, int count, short value) {
-    columnVector.putShorts(rowId, count, value);
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          putShort(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putShorts(rowId, count, value);
+    }
   }
 
   @Override public void putInt(int rowId, int value) {
-    columnVector.putInt(rowId, value);
+    if (!filteredRows[rowId]) {
+      columnVector.putInt(counter++, value);
+    }
   }
 
   @Override public void putInts(int rowId, int count, int value) {
-    columnVector.putInts(rowId, count, value);
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          putInt(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putInts(rowId, count, value);
+    }
   }
 
   @Override public void putLong(int rowId, long value) {
-    columnVector.putLong(rowId, value);
+    if (!filteredRows[rowId]) {
+      columnVector.putLong(counter++, value);
+    }
   }
 
   @Override public void putLongs(int rowId, int count, long value) {
-    columnVector.putLongs(rowId, count, value);
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          putLong(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putLongs(rowId, count, value);
+    }
   }
 
   @Override public void putDecimal(int rowId, Decimal value, int precision) {
-    columnVector.putDecimal(rowId, value, precision);
+    if (!filteredRows[rowId]) {
+      columnVector.putDecimal(counter++, value, precision);
+    }
   }
 
   @Override public void putDecimals(int rowId, int count, Decimal value, int 
precision) {
     for (int i = 0; i < count; i++) {
-      putDecimal(rowId++, value, precision);
+      if (!filteredRows[rowId]) {
+        putDecimal(counter++, value, precision);
+      }
+      rowId++;
     }
   }
 
   @Override public void putDouble(int rowId, double value) {
-    columnVector.putDouble(rowId, value);
+    if (!filteredRows[rowId]) {
+      columnVector.putDouble(counter++, value);
+    }
   }
 
   @Override public void putDoubles(int rowId, int count, double value) {
-    columnVector.putDoubles(rowId, count, value);
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          putDouble(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putDoubles(rowId, count, value);
+    }
   }
 
   @Override public void putBytes(int rowId, byte[] value) {
-    columnVector.putByteArray(rowId, value);
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value);
+    }
   }
 
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
-      putBytes(rowId++, value);
+      if (!filteredRows[rowId]) {
+        putBytes(counter++, value);
+      }
+      rowId++;
     }
   }
 
   @Override public void putBytes(int rowId, int offset, int length, byte[] 
value) {
-    columnVector.putByteArray(rowId, value, offset, length);
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value, offset, length);
+    }
   }
 
   @Override public void putNull(int rowId) {
-    columnVector.putNull(rowId);
+    if (!filteredRows[rowId]) {
+      columnVector.putNull(counter++);
+    }
   }
 
   @Override public void putNulls(int rowId, int count) {
-    columnVector.putNulls(rowId, count);
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          putNull(counter++);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putNulls(rowId, count);
+    }
   }
 
   @Override public boolean isNull(int rowId) {
@@ -108,6 +195,15 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public void reset() {
-//    columnVector.reset();
+    counter = 0;
+    filteredRowsExist = false;
+  }
+
+  @Override public DataType getType() {
+    return columnVector.dataType();
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+    this.filteredRowsExist = filteredRowsExist;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 3fdf9af..173c527 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -219,10 +219,11 @@ class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
 
     columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i));
+      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), 
filteredRows);
     }
-    carbonColumnarBatch = new CarbonColumnarBatch(vectors, 
columnarBatch.capacity());
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, 
columnarBatch.capacity(), filteredRows);
   }
 
   private void initBatch() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 0fb5c47..c9fc46c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -79,13 +79,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
   }
 }
 
-object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
-
-  var sparkSession: SparkSession = _
-
-  def init(sparkSession: SparkSession) {
-     this.sparkSession = sparkSession
-  }
+case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
 
   private val parser = new SparkSqlParser(sparkSession.sessionState.conf)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e413840..156a12e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -67,7 +67,6 @@ class CarbonSessionCatalog(
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
-    CarbonIUDAnalysisRule.init(sparkSession)
     env
   }
 
@@ -130,7 +129,7 @@ class CarbonSessionState(sparkSession: SparkSession) 
extends HiveSessionState(sp
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         CarbonPreInsertionCasts ::
-        CarbonIUDAnalysisRule ::
+        CarbonIUDAnalysisRule(sparkSession) ::
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::

Reply via email to