[CARBONDATA-2649] Fixed arrayIndexOutOfBoundException while loading Blocklet DataMap after alter add column operation
Things done as part of this PR Fixed arrayIndexOutOfBoundException while loading Blocklet DataMap after alter add column operation Problem: Array Index out of bound exception was thrown after alter add column operation. Analysis: After alter add column operation if COLUMN_META_CACHE is set on the newly added columns, then on executing select query on the data loaded before alter operation threw exception. This was because minMaxCache caching columns were fetched irrespective of the segmentProperties. Data loaded before alter add column operation will not have the newly added columns in its columnSchemaList and hence can throw exception if non existent column are not removed from min/max column cache. Solution: Fetch the min/max cache columns based on segmentProperties This closes #2510 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8e789571 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8e789571 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8e789571 Branch: refs/heads/carbonstore Commit: 8e7895715753f13964887688fdf6e59d3dca5ed8 Parents: 7341907 Author: m00258959 <manish.gu...@huawei.com> Authored: Mon Jul 16 12:26:41 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon Jul 16 20:02:01 2018 +0530 ---------------------------------------------------------------------- .../block/SegmentPropertiesAndSchemaHolder.java | 14 ++++++---- .../indexstore/BlockletDataMapIndexStore.java | 3 ++- .../indexstore/blockletindex/BlockDataMap.java | 10 +++---- .../blockletindex/BlockletDataMap.java | 2 +- .../blockletindex/BlockletDataMapModel.java | 9 ------- .../core/metadata/schema/table/CarbonTable.java | 28 ++++++++++++++------ ...ithColumnMetCacheAndCacheLevelProperty.scala | 11 ++++++++ 7 files changed, 48 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java index e094076..bb7ff0d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java @@ -108,7 +108,7 @@ public class SegmentPropertiesAndSchemaHolder { this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); if (null == segmentIdSetAndIndexWrapper) { // create new segmentProperties - segmentPropertiesWrapper.initSegmentProperties(); + segmentPropertiesWrapper.initSegmentProperties(carbonTable); int segmentPropertiesIndex = segmentPropertiesIndexCounter.incrementAndGet(); indexToSegmentPropertiesWrapperMapping .put(segmentPropertiesIndex, segmentPropertiesWrapper); @@ -216,8 +216,11 @@ public class SegmentPropertiesAndSchemaHolder { * * @param segmentId * @param segmentPropertiesIndex + * @param clearSegmentWrapperFromMap flag to specify whether to clear segmentPropertiesWrapper + * from Map if all the segment's using it have become stale */ - public void invalidate(String segmentId, int segmentPropertiesIndex) { + public void invalidate(String segmentId, int segmentPropertiesIndex, + boolean clearSegmentWrapperFromMap) { SegmentPropertiesWrapper segmentPropertiesWrapper = indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex); if (null != segmentPropertiesWrapper) { @@ -230,7 +233,8 @@ public class SegmentPropertiesAndSchemaHolder { // if after removal of given SegmentId, the segmentIdSet becomes empty that means this // segmentPropertiesWrapper is not getting used at all. In that case this object can be // removed from all the holders - if (segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) { + if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet + .isEmpty()) { indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex); segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper); } @@ -254,11 +258,11 @@ public class SegmentPropertiesAndSchemaHolder { this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier(); this.columnsInTable = columnsInTable; this.columnCardinality = columnCardinality; - this.minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(); } - public void initSegmentProperties() { + public void initSegmentProperties(CarbonTable carbonTable) { segmentProperties = new SegmentProperties(columnsInTable, columnCardinality); + this.minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(segmentProperties); } @Override public boolean equals(Object obj) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 3918f3e..1501e6d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -199,7 +199,8 @@ public class BlockletDataMapIndexStore // as segmentId will be same for all the dataMaps and segmentProperties cache is // maintained at segment level so it need to be called only once for clearing SegmentPropertiesAndSchemaHolder.getInstance() - .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex()); + .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(), + tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafe()); } } lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier() http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index f8126cc..7baba89 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -224,7 +224,7 @@ public class BlockDataMap extends CoarseGrainDataMap byte[][] updatedMaxValues = CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false); summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties, - blockletDataMapInfo.getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow, + getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow, blockMetaInfo, updatedMinValues, updatedMaxValues); } } @@ -286,8 +286,8 @@ public class BlockDataMap extends CoarseGrainDataMap TableBlockInfo previousBlockInfo = previousDataFileFooter.getBlockInfo().getTableBlockInfo(); summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, previousDataFileFooter, - segmentProperties, blockletDataMapInfo.getMinMaxCacheColumns(), - previousBlockInfo.getFilePath(), summaryRow, + segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(), + summaryRow, blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()), blockMinValues, blockMaxValues); // flag to check whether last file footer entry is different from previous entry. @@ -311,7 +311,7 @@ public class BlockDataMap extends CoarseGrainDataMap if (isLastFileFooterEntryNeedToBeAdded) { summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, previousDataFileFooter, segmentProperties, - blockletDataMapInfo.getMinMaxCacheColumns(), + getMinMaxCacheColumns(), previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath(), summaryRow, blockletDataMapInfo.getBlockMetaInfoMap() .get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()), @@ -530,7 +530,7 @@ public class BlockDataMap extends CoarseGrainDataMap return false; } - private List<CarbonColumn> getMinMaxCacheColumns() { + protected List<CarbonColumn> getMinMaxCacheColumns() { return SegmentPropertiesAndSchemaHolder.getInstance() .getSegmentPropertiesWrapper(segmentPropertiesIndex).getMinMaxCacheColumns(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index bbe37c0..6a05442 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -126,7 +126,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable { relativeBlockletId = 0; } summaryRow = loadToUnsafe(schema, taskSummarySchema, fileFooter, segmentProperties, - blockletDataMapInfo.getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow, + getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow, blockMetaInfo, relativeBlockletId); // this is done because relative blocklet id need to be incremented based on the // total number of blocklets http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index 7516204..180c812 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -16,13 +16,11 @@ */ package org.apache.carbondata.core.indexstore.blockletindex; -import java.util.List; import java.util.Map; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; /** * It is the model object to keep the information to build or initialize BlockletDataMap. @@ -35,8 +33,6 @@ public class BlockletDataMapModel extends DataMapModel { private CarbonTable carbonTable; - private List<CarbonColumn> minMaxCacheColumns; - private String segmentId; private boolean addToUnsafe = true; @@ -48,7 +44,6 @@ public class BlockletDataMapModel extends DataMapModel { this.blockMetaInfoMap = blockMetaInfoMap; this.segmentId = segmentId; this.carbonTable = carbonTable; - this.minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(); } public BlockletDataMapModel(CarbonTable carbonTable, String filePath, @@ -77,8 +72,4 @@ public class BlockletDataMapModel extends DataMapModel { public CarbonTable getCarbonTable() { return carbonTable; } - - public List<CarbonColumn> getMinMaxCacheColumns() { - return minMaxCacheColumns; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index ffdd6b3..71256d4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -1195,11 +1196,14 @@ public class CarbonTable implements Serializable { /** * Method to find get carbon columns for columns to be cached. It will fill dimension first and - * then measures + * then measures based on the block segmentProperties. + * In alter add column scenarios it can happen that the newly added columns are being cached + * which do not exist in already loaded data. In those cases newly added columns should not be + * cached for the already loaded data * * @return */ - public List<CarbonColumn> getMinMaxCacheColumns() { + public List<CarbonColumn> getMinMaxCacheColumns(SegmentProperties segmentProperties) { List<CarbonColumn> minMaxCachedColsList = null; String tableName = tableInfo.getFactTable().getTableName(); String cacheColumns = @@ -1215,12 +1219,16 @@ public class CarbonTable implements Serializable { CarbonDimension dimension = getDimensionByName(tableName, column); // if found in dimension then add to dimension else add to measures if (null != dimension) { - // first add normal dimensions and then complex dimensions - if (dimension.isComplex()) { - complexDimensions.add(dimension); - continue; + CarbonDimension dimensionFromCurrentBlock = + segmentProperties.getDimensionFromCurrentBlock(dimension); + if (null != dimensionFromCurrentBlock) { + // first add normal dimensions and then complex dimensions + if (dimensionFromCurrentBlock.isComplex()) { + complexDimensions.add(dimensionFromCurrentBlock); + continue; + } + minMaxCachedColsList.add(dimensionFromCurrentBlock); } - minMaxCachedColsList.add(dimension); } else { measureColumns.add(column); } @@ -1231,7 +1239,11 @@ public class CarbonTable implements Serializable { for (String measureColumn : measureColumns) { CarbonMeasure measure = getMeasureByName(tableName, measureColumn); if (null != measure) { - minMaxCachedColsList.add(measure); + CarbonMeasure measureFromCurrentBlock = + segmentProperties.getMeasureFromCurrentBlock(measure.getColumnId()); + if (null != measureFromCurrentBlock) { + minMaxCachedColsList.add(measureFromCurrentBlock); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e789571/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala index 3e1f188..af9930a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -257,4 +257,15 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be Row(5)) } + test("verify column caching with alter add column") { + sql("drop table if exists alter_add_column_min_max") + sql("create table alter_add_column_min_max (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId double,deviceInformationId double,productionDate Timestamp,deliveryDate timestamp,deliverycharge double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('table_blocksize'='1','COLUMN_META_CACHE'='AMSize','CACHE_LEVEL'='BLOCKLET')") + sql("insert into alter_add_column_min_max select '1AA1','8RAM size','4','Chinese','guangzhou',2738,1,'2014-07-01 12:07:28','2014-07-01 12:07:28',25") + sql("alter table alter_add_column_min_max add columns(age int, name string)") + sql("ALTER TABLE alter_add_column_min_max SET TBLPROPERTIES('COLUMN_META_CACHE'='age,name')") + sql("insert into alter_add_column_min_max select '1AA1','8RAM size','4','Chinese','guangzhou',2738,1,'2014-07-01 12:07:28','2014-07-01 12:07:28',25,29,'Rahul'") + checkAnswer(sql("select count(*) from alter_add_column_min_max where AMSize='8RAM size'"), Row(2)) + sql("drop table if exists alter_add_column_min_max") + } + }