Supported IUD for vector reader Fixed commets
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/64f973e8 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/64f973e8 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/64f973e8 Branch: refs/heads/branch-1.1 Commit: 64f973e86b730e6454ef9b6d8a1e50dd6e8a85e5 Parents: ef583af Author: ravipesala <ravi.pes...@gmail.com> Authored: Wed May 31 20:54:49 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 13:15:42 2017 +0530 ---------------------------------------------------------------------- .../DictionaryBasedVectorResultCollector.java | 5 +- .../core/scan/result/AbstractScannedResult.java | 25 +++- .../scan/result/vector/CarbonColumnVector.java | 9 ++ .../scan/result/vector/CarbonColumnarBatch.java | 33 ++++- .../dataload/TestBatchSortDataLoad.scala | 20 +-- .../iud/UpdateCarbonTableTestCase.scala | 2 +- .../vectorreader/ColumnarVectorWrapper.java | 130 ++++++++++++++++--- .../VectorizedCarbonRecordReader.java | 5 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 8 +- .../spark/sql/hive/CarbonSessionState.scala | 3 +- 10 files changed, 198 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java index 91afe77..7a8fe06 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java @@ -144,6 +144,8 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC return; } fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows); + scannedResult.markFilteredRows( + columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter()); scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows); } } @@ -162,7 +164,8 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC // Or set the row counter. scannedResult.setRowCounter(rowCounter + requiredRows); } - columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows); + columnarBatch.setActualSize( + columnarBatch.getActualSize() + requiredRows - columnarBatch.getRowsFilteredCount()); columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java index e57a290..a1074ea 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -283,7 +284,8 @@ public abstract class AbstractScannedResult { String data = getBlockletId(); if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID .equals(columnVectorInfo.dimension.getColumnName())) { - data = data + CarbonCommonConstants.FILE_SEPARATOR + j; + data = data + CarbonCommonConstants.FILE_SEPARATOR + + (rowMapping == null ? j : rowMapping[pageCounter][j]); } vector.putBytes(vectorOffset++, offset, data.length(), data.getBytes()); } @@ -638,4 +640,25 @@ public abstract class AbstractScannedResult { BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) { this.blockletDeleteDeltaCache = blockletDeleteDeltaCache; } + + /** + * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later. + * @param columnarBatch + * @param startRow + * @param size + * @param vectorOffset + */ + public void markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size, + int vectorOffset) { + if (blockletDeleteDeltaCache != null) { + int len = startRow + size; + for (int i = startRow; i < len; i++) { + int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i; + if (blockletDeleteDeltaCache.contains(rowId)) { + columnarBatch.markFiltered(vectorOffset); + } + vectorOffset++; + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java index 4952e07..a3eb48b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java @@ -17,10 +17,15 @@ package org.apache.carbondata.core.scan.result.vector; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; public interface CarbonColumnVector { + void putBoolean(int rowId, boolean value); + + void putFloat(int rowId, float value); + void putShort(int rowId, short value); void putShorts(int rowId, int count, short value); @@ -59,4 +64,8 @@ public interface CarbonColumnVector { void reset(); + DataType getType(); + + void setFilteredRowsExist(boolean filteredRowsExist); + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java index faeffde..cfc2f16 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.scan.result.vector; +import java.util.Arrays; + public class CarbonColumnarBatch { public CarbonColumnVector[] columnVectors; @@ -27,9 +29,15 @@ public class CarbonColumnarBatch { private int rowCounter; - public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize) { + private boolean[] filteredRows; + + private int rowsFiltered; + + public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize, + boolean[] filteredRows) { this.columnVectors = columnVectors; this.batchSize = batchSize; + this.filteredRows = filteredRows; } public int getBatchSize() { @@ -47,6 +55,8 @@ public class CarbonColumnarBatch { public void reset() { actualSize = 0; rowCounter = 0; + rowsFiltered = 0; + Arrays.fill(filteredRows, false); for (int i = 0; i < columnVectors.length; i++) { columnVectors[i].reset(); } @@ -59,4 +69,25 @@ public class CarbonColumnarBatch { public void setRowCounter(int rowCounter) { this.rowCounter = rowCounter; } + + /** + * Mark the rows as filterd first before filling the batch, so that these rows will not be added + * to vector batches. + * @param rowId + */ + public void markFiltered(int rowId) { + if (!filteredRows[rowId]) { + filteredRows[rowId] = true; + rowsFiltered++; + } + if (rowsFiltered == 1) { + for (int i = 0; i < columnVectors.length; i++) { + columnVectors[i].setFilteredRowsExist(true); + } + } + } + + public int getRowsFilteredCount() { + return rowsFiltered; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala index 43bcac8..d53b5e5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -36,7 +36,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { val writer = new BufferedWriter(new FileWriter(file)) writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10") writer.newLine() - for(i <- 0 until 200000) { + for(i <- 0 until 100000) { writer.write("a" + i%1000 + "," + "b" + i%1000 + "," + "c" + i%1000 + "," + @@ -84,9 +84,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") - checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000))) + checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(100000))) - assert(getIndexfileCount("carbon_load1") == 10, "Something wrong in batch sort") + assert(getIndexfileCount("carbon_load1") == 5, "Something wrong in batch sort") } test("test batch sort load by passing option to load command and compare with normal load") { @@ -115,7 +115,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") sql("alter table carbon_load1 compact 'major'") Thread.sleep(4000) - checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000))) + checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(400000))) assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in compaction after batch sort") @@ -137,7 +137,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ") - checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000))) + checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(400000))) checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"), sql("select * from carbon_load5 where c1='a1' order by c1")) @@ -165,9 +165,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 'single_pass'='true')") - checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000))) + checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(100000))) - assert(getIndexfileCount("carbon_load3") == 10, "Something wrong in batch sort") + assert(getIndexfileCount("carbon_load3") == 5, "Something wrong in batch sort") checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"), sql("select * from carbon_load2 where c1='a1' order by c1")) @@ -186,9 +186,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " ) - checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000))) + checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(100000))) - assert(getIndexfileCount("carbon_load4") == 10, "Something wrong in batch sort") + assert(getIndexfileCount("carbon_load4") == 5, "Something wrong in batch sort") CarbonProperties.getInstance(). addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) @@ -206,7 +206,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " ) - checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000))) + checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(100000))) assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch sort") CarbonProperties.getInstance(). http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 25fe91b..7917b61 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -42,7 +42,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false") + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index f94c0b2..c3d2a87 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -20,78 +20,165 @@ package org.apache.carbondata.spark.vectorreader; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; class ColumnarVectorWrapper implements CarbonColumnVector { private ColumnVector columnVector; - public ColumnarVectorWrapper(ColumnVector columnVector) { + private boolean[] filteredRows; + + private int counter; + + private boolean filteredRowsExist; + + public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) { this.columnVector = columnVector; + this.filteredRows = filteredRows; + } + + @Override public void putBoolean(int rowId, boolean value) { + if (!filteredRows[rowId]) { + columnVector.putBoolean(counter++, value); + } + } + + @Override public void putFloat(int rowId, float value) { + if (!filteredRows[rowId]) { + columnVector.putFloat(counter++, value); + } } @Override public void putShort(int rowId, short value) { - columnVector.putShort(rowId, value); + if (!filteredRows[rowId]) { + columnVector.putShort(counter++, value); + } } @Override public void putShorts(int rowId, int count, short value) { - columnVector.putShorts(rowId, count, value); + if (filteredRowsExist) { + for (int i = 0; i < count; i++) { + if (!filteredRows[rowId]) { + putShort(counter++, value); + } + rowId++; + } + } else { + columnVector.putShorts(rowId, count, value); + } } @Override public void putInt(int rowId, int value) { - columnVector.putInt(rowId, value); + if (!filteredRows[rowId]) { + columnVector.putInt(counter++, value); + } } @Override public void putInts(int rowId, int count, int value) { - columnVector.putInts(rowId, count, value); + if (filteredRowsExist) { + for (int i = 0; i < count; i++) { + if (!filteredRows[rowId]) { + putInt(counter++, value); + } + rowId++; + } + } else { + columnVector.putInts(rowId, count, value); + } } @Override public void putLong(int rowId, long value) { - columnVector.putLong(rowId, value); + if (!filteredRows[rowId]) { + columnVector.putLong(counter++, value); + } } @Override public void putLongs(int rowId, int count, long value) { - columnVector.putLongs(rowId, count, value); + if (filteredRowsExist) { + for (int i = 0; i < count; i++) { + if (!filteredRows[rowId]) { + putLong(counter++, value); + } + rowId++; + } + } else { + columnVector.putLongs(rowId, count, value); + } } @Override public void putDecimal(int rowId, Decimal value, int precision) { - columnVector.putDecimal(rowId, value, precision); + if (!filteredRows[rowId]) { + columnVector.putDecimal(counter++, value, precision); + } } @Override public void putDecimals(int rowId, int count, Decimal value, int precision) { for (int i = 0; i < count; i++) { - putDecimal(rowId++, value, precision); + if (!filteredRows[rowId]) { + putDecimal(counter++, value, precision); + } + rowId++; } } @Override public void putDouble(int rowId, double value) { - columnVector.putDouble(rowId, value); + if (!filteredRows[rowId]) { + columnVector.putDouble(counter++, value); + } } @Override public void putDoubles(int rowId, int count, double value) { - columnVector.putDoubles(rowId, count, value); + if (filteredRowsExist) { + for (int i = 0; i < count; i++) { + if (!filteredRows[rowId]) { + putDouble(counter++, value); + } + rowId++; + } + } else { + columnVector.putDoubles(rowId, count, value); + } } @Override public void putBytes(int rowId, byte[] value) { - columnVector.putByteArray(rowId, value); + if (!filteredRows[rowId]) { + columnVector.putByteArray(counter++, value); + } } @Override public void putBytes(int rowId, int count, byte[] value) { for (int i = 0; i < count; i++) { - putBytes(rowId++, value); + if (!filteredRows[rowId]) { + putBytes(counter++, value); + } + rowId++; } } @Override public void putBytes(int rowId, int offset, int length, byte[] value) { - columnVector.putByteArray(rowId, value, offset, length); + if (!filteredRows[rowId]) { + columnVector.putByteArray(counter++, value, offset, length); + } } @Override public void putNull(int rowId) { - columnVector.putNull(rowId); + if (!filteredRows[rowId]) { + columnVector.putNull(counter++); + } } @Override public void putNulls(int rowId, int count) { - columnVector.putNulls(rowId, count); + if (filteredRowsExist) { + for (int i = 0; i < count; i++) { + if (!filteredRows[rowId]) { + putNull(counter++); + } + rowId++; + } + } else { + columnVector.putNulls(rowId, count); + } } @Override public boolean isNull(int rowId) { @@ -108,6 +195,15 @@ class ColumnarVectorWrapper implements CarbonColumnVector { } @Override public void reset() { -// columnVector.reset(); + counter = 0; + filteredRowsExist = false; + } + + @Override public DataType getType() { + return columnVector.dataType(); + } + + @Override public void setFilteredRowsExist(boolean filteredRowsExist) { + this.filteredRowsExist = filteredRowsExist; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 3fdf9af..173c527 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -219,10 +219,11 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode); CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; + boolean[] filteredRows = new boolean[columnarBatch.capacity()]; for (int i = 0; i < fields.length; i++) { - vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i)); + vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows); } - carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity()); + carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows); } private void initBatch() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 0fb5c47..c9fc46c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -79,13 +79,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { } } -object CarbonIUDAnalysisRule extends Rule[LogicalPlan] { - - var sparkSession: SparkSession = _ - - def init(sparkSession: SparkSession) { - this.sparkSession = sparkSession - } +case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { private val parser = new SparkSqlParser(sparkSession.sessionState.conf) http://git-wip-us.apache.org/repos/asf/carbondata/blob/64f973e8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala index e413840..156a12e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -67,7 +67,6 @@ class CarbonSessionCatalog( lazy val carbonEnv = { val env = new CarbonEnv env.init(sparkSession) - CarbonIUDAnalysisRule.init(sparkSession) env } @@ -130,7 +129,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp catalog.ParquetConversions :: catalog.OrcConversions :: CarbonPreInsertionCasts :: - CarbonIUDAnalysisRule :: + CarbonIUDAnalysisRule(sparkSession) :: AnalyzeCreateTable(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) ::