[CARBONDATA-2727][BloomDataMap] Support create bloom datamap on newly added column
Add a result collector with rowId infomation for datamap rebuild if table schema is changed; Use keygenerator to retrieve surrogate value of dictIndexColumn from query result; This closes #2490 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/81038f55 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/81038f55 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/81038f55 Branch: refs/heads/carbonstore Commit: 81038f55ef9a582f82305378988f603ded76e524 Parents: aec47e0 Author: Manhua <kevin...@qq.com> Authored: Wed Jul 11 19:39:31 2018 +0800 Committer: xuchuanyin <xuchuan...@hust.edu.cn> Committed: Tue Jul 17 23:31:43 2018 +0800 ---------------------------------------------------------------------- .../scan/collector/ResultCollectorFactory.java | 31 ++--- ...RowIdRestructureBasedRawResultCollector.java | 138 +++++++++++++++++++ .../bloom/AbstractBloomDataMapWriter.java | 72 +--------- .../bloom/BloomCoarseGrainDataMapFactory.java | 2 +- .../datamap/bloom/BloomDataMapBuilder.java | 8 ++ .../datamap/bloom/BloomDataMapWriter.java | 72 ++++++++++ .../datamap/IndexDataMapRebuildRDD.scala | 131 +++++++++++------- .../bloom/BloomCoarseGrainDataMapSuite.scala | 96 +++++++++++++ 8 files changed, 413 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java index ea4afd1..e0a0b90 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java @@ -18,15 +18,7 @@ package org.apache.carbondata.core.scan.collector; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.*; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; /** @@ -51,14 +43,21 @@ public class ResultCollectorFactory { AbstractScannedResultCollector scannerResultAggregator = null; if (blockExecutionInfo.isRawRecordDetailQuery()) { if (blockExecutionInfo.isRestructuredBlock()) { - LOGGER.info("Restructure based raw collector is used to scan and collect the data"); - scannerResultAggregator = new RestructureBasedRawResultCollector(blockExecutionInfo); - } else if (blockExecutionInfo.isRequiredRowId()) { - LOGGER.info("RowId based raw collector is used to scan and collect the data"); - scannerResultAggregator = new RowIdRawBasedResultCollector(blockExecutionInfo); + if (blockExecutionInfo.isRequiredRowId()) { + LOGGER.info("RowId Restructure based raw ollector is used to scan and collect the data"); + scannerResultAggregator = new RowIdRestructureBasedRawResultCollector(blockExecutionInfo); + } else { + LOGGER.info("Restructure based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RestructureBasedRawResultCollector(blockExecutionInfo); + } } else { - LOGGER.info("Row based raw collector is used to scan and collect the data"); - scannerResultAggregator = new RawBasedResultCollector(blockExecutionInfo); + if (blockExecutionInfo.isRequiredRowId()) { + LOGGER.info("RowId based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RowIdRawBasedResultCollector(blockExecutionInfo); + } else { + LOGGER.info("Row based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RawBasedResultCollector(blockExecutionInfo); + } } } else if (blockExecutionInfo.isVectorBatchCollector()) { if (blockExecutionInfo.isRestructuredBlock()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java new file mode 100644 index 0000000..28e778f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRestructureBasedRawResultCollector.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; + +/** + * It is not a collector it is just a scanned result holder. + * This class returns all the dimensions in a ByteArrayWrapper and append + * blockletNo/PageId/RowId at end of the row. + */ +@InterfaceAudience.Internal +public class RowIdRestructureBasedRawResultCollector extends RestructureBasedRawResultCollector { + + public RowIdRestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + } + + @Override + protected void scanAndFillData(BlockletScannedResult scannedResult, int batchSize, + List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures) { + int numberOfPages = scannedResult.numberOfpages(); + // loop will exit once the batchSize data has been read or the pages have been exhausted + while (scannedResult.getCurrentPageCounter() < numberOfPages) { + int currentPageRowCount = scannedResult.getCurrentPageRowCount(); + if (currentPageRowCount == 0) { + scannedResult.incrementPageCounter(); + continue; + } + int rowCounter = scannedResult.getRowCounter(); + // getRowCounter holds total number rows processed. Calculate the + // Left over space through getRowCounter only. + int availableRows = currentPageRowCount - rowCounter; + // rows available in current page that can be processed from current page + int availableBatchRowCount = Math.min(batchSize, availableRows); + // this condition will be true if no data left in the current block/blocklet to be scanned + if (availableBatchRowCount < 1) { + break; + } + if (batchSize > availableRows) { + batchSize = batchSize - availableRows; + } else { + // this is done because in IUD cases actuals rows fetch can be less than batch size as + // some of the rows could have deleted. So in those cases batchSize need to be + // re initialized with left over value + batchSize = 0; + } + // for every iteration of available rows filling newly created list of Object[] and add it to + // the final list so there is no mismatch in the counter while filling dimension and + // measure data + List<Object[]> collectedData = new ArrayList<>(availableBatchRowCount); + // fill dimension data + fillDimensionData(scannedResult, collectedData, queryMeasures, availableBatchRowCount); + fillMeasureData(scannedResult, collectedData); + // increment the number of rows scanned in scanned result statistics + // incrementScannedResultRowCounter(scannedResult, availableBatchRowCount); + // assign the left over rows to batch size if the number of rows fetched are lesser + // than batchSize + if (collectedData.size() < availableBatchRowCount) { + batchSize += availableBatchRowCount - listBasedResult.size(); + } + // add the collected data to the final list + listBasedResult.addAll(collectedData); + } + } + + private void fillDimensionData(BlockletScannedResult scannedResult, + List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures, int batchSize) { + long startTime = System.currentTimeMillis(); + List<byte[]> dictionaryKeyArrayBatch = scannedResult.getDictionaryKeyArrayBatch(batchSize); + List<byte[][]> noDictionaryKeyArrayBatch = + scannedResult.getNoDictionaryKeyArrayBatch(batchSize); + List<byte[][]> complexTypeKeyArrayBatch = scannedResult.getComplexTypeKeyArrayBatch(batchSize); + // it will same for one blocklet so can be computed only once + byte[] implicitColumnByteArray = scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + // Note: size check in for loop is for dictionaryKeyArrayBatch as this size can be lesser than + // batch size in case of IUD scenarios + for (int i = 0; i < dictionaryKeyArrayBatch.size(); i++) { + // 1 for ByteArrayWrapper object which will contain dictionary and no dictionary data + // 3 for blockletId, pageId, rowId + Object[] row = new Object[1 + queryMeasures.length + 3]; + scannedResult.incrementCounter(); + row[1 + queryMeasures.length] = scannedResult.getBlockletNumber(); + row[1 + queryMeasures.length + 1] = scannedResult.getCurrentPageCounter(); + ByteArrayWrapper wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(dictionaryKeyArrayBatch.get(i)); + wrapper.setNoDictionaryKeys(noDictionaryKeyArrayBatch.get(i)); + wrapper.setComplexTypesKeys(complexTypeKeyArrayBatch.get(i)); + wrapper.setImplicitColumnByteArray(implicitColumnByteArray); + row[0] = wrapper; + row[1 + queryMeasures.length + 2] = scannedResult.getCurrentRowId(); + listBasedResult.add(row); + } + QueryStatistic keyColumnFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME); + keyColumnFillingTime.addCountStatistic(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, + keyColumnFillingTime.getCount() + (System.currentTimeMillis() - startTime)); + } + + private void fillMeasureData(BlockletScannedResult scannedResult, + List<Object[]> listBasedResult) { + long startTime = System.currentTimeMillis(); + // if list is not empty after filling the dimension data then only fill the measure data + if (!listBasedResult.isEmpty()) { + fillMeasureDataBatch(listBasedResult, 1, scannedResult); + } + QueryStatistic measureFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.MEASURE_FILLING_TIME); + measureFillingTime.addCountStatistic(QueryStatisticsConstants.MEASURE_FILLING_TIME, + measureFillingTime.getCount() + (System.currentTimeMillis() - startTime)); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java index fcecc01..176be6e 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java @@ -20,9 +20,7 @@ package org.apache.carbondata.datamap.bloom; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; @@ -33,16 +31,10 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.keygenerator.KeyGenerator; -import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.Predicate; import org.apache.hadoop.util.bloom.CarbonBloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; @@ -58,13 +50,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { private List<String> currentDMFiles; private List<DataOutputStream> currentDataOutStreams; protected List<CarbonBloomFilter> indexBloomFilters; - private KeyGenerator keyGenerator; - private ColumnarSplitter columnarSplitter; - // for the dict/sort/date column, they are encoded in MDK, - // this maps the index column name to the index in MDK - private Map<String, Integer> indexCol2MdkIdx; - // this gives the reverse map to indexCol2MdkIdx - private Map<Integer, String> mdkIdx2IndexCol; AbstractBloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, Segment segment, String shardName, SegmentProperties segmentProperties, @@ -79,27 +64,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { indexBloomFilters = new ArrayList<>(indexColumns.size()); initDataMapFile(); resetBloomFilters(); - - keyGenerator = segmentProperties.getDimensionKeyGenerator(); - columnarSplitter = segmentProperties.getFixedLengthKeySplitter(); - this.indexCol2MdkIdx = new HashMap<>(); - this.mdkIdx2IndexCol = new HashMap<>(); - int idx = 0; - for (final CarbonDimension dimension : segmentProperties.getDimensions()) { - if (!dimension.isGlobalDictionaryEncoding() && !dimension.isDirectDictionaryEncoding()) { - continue; - } - boolean isExistInIndex = CollectionUtils.exists(indexColumns, new Predicate() { - @Override public boolean evaluate(Object object) { - return ((CarbonColumn) object).getColName().equalsIgnoreCase(dimension.getColName()); - } - }); - if (isExistInIndex) { - this.indexCol2MdkIdx.put(dimension.getColName(), idx); - this.mdkIdx2IndexCol.put(idx, dimension.getColName()); - } - idx++; - } } @Override @@ -172,7 +136,7 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { } else { if (indexColumns.get(indexColIdx).hasEncoding(Encoding.DICTIONARY) || indexColumns.get(indexColIdx).hasEncoding(Encoding.DIRECT_DICTIONARY)) { - indexValue = convertDictionaryValue(indexColIdx, (byte[]) value); + indexValue = convertDictionaryValue(indexColIdx, value); } else { indexValue = convertNonDictionaryValue(indexColIdx, (byte[]) value); } @@ -183,38 +147,7 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { indexBloomFilters.get(indexColIdx).add(new Key(indexValue)); } - protected byte[] convertDictionaryValue(int indexColIdx, byte[] value) { - byte[] fakeMdkBytes; - // this means that we need to pad some fake bytes - // to get the whole MDK in corresponding position - if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) { - int totalSize = 0; - for (int size : columnarSplitter.getBlockKeySize()) { - totalSize += size; - } - fakeMdkBytes = new byte[totalSize]; - - // put this bytes to corresponding position - int thisKeyIdx = indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName()); - int destPos = 0; - for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; keyIdx++) { - if (thisKeyIdx == keyIdx) { - System.arraycopy(value, 0, - fakeMdkBytes, destPos, columnarSplitter.getBlockKeySize()[thisKeyIdx]); - break; - } - destPos += columnarSplitter.getBlockKeySize()[keyIdx]; - } - } else { - fakeMdkBytes = value; - } - // for dict columns including dictionary and date columns - // decode value to get the surrogate key - int surrogateKey = (int) keyGenerator.getKey(fakeMdkBytes, - indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName())); - // store the dictionary key in bloom - return CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey); - } + protected abstract byte[] convertDictionaryValue(int indexColIdx, Object value); protected abstract byte[] convertNonDictionaryValue(int indexColIdx, byte[] value); @@ -276,5 +209,4 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { currentDataOutStreams.get(indexColId)); } } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index d9646d5..35ebd20 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -385,7 +385,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa case ALTER_DROP: return true; case ALTER_ADD_COLUMN: - return true; + return false; case ALTER_CHANGE_DATATYPE: return true; case STREAMING: http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java index 7ba8c42..29e3060 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java @@ -24,7 +24,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapBuilder; import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonUtil; /** * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data @@ -75,6 +77,12 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D } @Override + protected byte[] convertDictionaryValue(int indexColIdx, Object value) { + // input value from IndexDataMapRebuildRDD is already decoded as surrogate key + return CarbonUtil.getValueAsBytes(DataTypes.INT, value); + } + + @Override public void close() throws IOException { releaseResouce(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index 2769773..1d01e66 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -17,13 +17,22 @@ package org.apache.carbondata.datamap.bloom; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Predicate; /** * BloomDataMap is constructed in CG level (blocklet level). @@ -34,6 +43,13 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; */ @InterfaceAudience.Internal public class BloomDataMapWriter extends AbstractBloomDataMapWriter { + private KeyGenerator keyGenerator; + private ColumnarSplitter columnarSplitter; + // for the dict/sort/date column, they are encoded in MDK, + // this maps the index column name to the index in MDK + private Map<String, Integer> indexCol2MdkIdx; + // this gives the reverse map to indexCol2MdkIdx + private Map<Integer, String> mdkIdx2IndexCol; BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, Segment segment, String shardName, SegmentProperties segmentProperties, @@ -41,6 +57,27 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter { throws IOException { super(tablePath, dataMapName, indexColumns, segment, shardName, segmentProperties, bloomFilterSize, bloomFilterFpp, compressBloom); + + keyGenerator = segmentProperties.getDimensionKeyGenerator(); + columnarSplitter = segmentProperties.getFixedLengthKeySplitter(); + this.indexCol2MdkIdx = new HashMap<>(); + this.mdkIdx2IndexCol = new HashMap<>(); + int idx = 0; + for (final CarbonDimension dimension : segmentProperties.getDimensions()) { + if (!dimension.isGlobalDictionaryEncoding() && !dimension.isDirectDictionaryEncoding()) { + continue; + } + boolean isExistInIndex = CollectionUtils.exists(indexColumns, new Predicate() { + @Override public boolean evaluate(Object object) { + return ((CarbonColumn) object).getColName().equalsIgnoreCase(dimension.getColName()); + } + }); + if (isExistInIndex) { + this.indexCol2MdkIdx.put(dimension.getColName(), idx); + this.mdkIdx2IndexCol.put(idx, dimension.getColName()); + } + idx++; + } } protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) { @@ -50,4 +87,39 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter { return DataConvertUtil.getRawBytes(value); } } + + @Override + protected byte[] convertDictionaryValue(int indexColIdx, Object value) { + // input value from onPageAdded in load process is byte[] + byte[] fakeMdkBytes; + // this means that we need to pad some fake bytes + // to get the whole MDK in corresponding position + if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) { + int totalSize = 0; + for (int size : columnarSplitter.getBlockKeySize()) { + totalSize += size; + } + fakeMdkBytes = new byte[totalSize]; + + // put this bytes to corresponding position + int thisKeyIdx = indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName()); + int destPos = 0; + for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; keyIdx++) { + if (thisKeyIdx == keyIdx) { + System.arraycopy(value, 0, + fakeMdkBytes, destPos, columnarSplitter.getBlockKeySize()[thisKeyIdx]); + break; + } + destPos += columnarSplitter.getBlockKeySize()[keyIdx]; + } + } else { + fakeMdkBytes = (byte[])value; + } + // for dict columns including dictionary and date columns + // decode value to get the surrogate key + int surrogateKey = (int) keyGenerator.getKey(fakeMdkBytes, + indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName())); + // store the dictionary key in bloom + return CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 85466f1..70e5cba 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -23,7 +23,9 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import org.apache.commons.lang3.ArrayUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} @@ -33,19 +35,22 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager, Segment} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.dev.DataMapBuilder import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher -import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter +import org.apache.carbondata.core.keygenerator.KeyGenerator +import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.TaskMetricsMap +import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} @@ -55,6 +60,7 @@ import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl + /** * Helper object to rebuild the index DataMap */ @@ -154,89 +160,114 @@ class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[ */ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Array[CarbonColumn]) extends CarbonReadSupport[Array[Object]] { - var columnarSplitter: ColumnarSplitter = _ + var dimensionKeyGenerator: KeyGenerator = _ + // for the dictionary dimensions + var indexCol2IdxInDictArray: Map[String, Int] = Map() // for the non dictionary dimensions var indexCol2IdxInNoDictArray: Map[String, Int] = Map() // for the measures var indexCol2IdxInMeasureArray: Map[String, Int] = Map() - // for the dictionary/date dimensions - var dictIndexCol2MdkIndex: Map[String, Int] = Map() - var mdkIndex2DictIndexCol: Map[Int, String] = Map() - var existDim = false + + /** + * rebuild process get data from query, if some columns added to table but not in this segment + * it will be filled with default value and generate new key for dict dimension. + * Here we use same way as `RowIdRestructureBasedRawResultCollector` to prepare + * key generator to get surrogate value of dict column result. + * So we do not need to make a fake mdk to split when adding row to datamap + */ + def prepareKeyGenForDictIndexColumns(carbonTable: CarbonTable, + dictIndexColumns: ListBuffer[CarbonColumn]): Unit = { + + val columnCardinality = new util.ArrayList[Integer](dictIndexColumns.length) + val columnPartitioner = new util.ArrayList[Integer](dictIndexColumns.length) + + dictIndexColumns.foreach { col => + val dim = carbonTable.getDimensionByName(carbonTable.getTableName, col.getColName) + val currentBlockDimension = segmentProperties.getDimensionFromCurrentBlock(dim) + if (null != currentBlockDimension) { + columnCardinality.add(segmentProperties.getDimColumnsCardinality.apply( + currentBlockDimension.getKeyOrdinal)) + columnPartitioner.add(segmentProperties.getDimensionPartitions.apply( + currentBlockDimension.getKeyOrdinal + )) + } else { + columnPartitioner.add(1) + if (col.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + columnCardinality.add(Integer.MAX_VALUE) + } else { + val defaultValue = col.getDefaultValue + if (null != col.getDefaultValue) { + columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1) + } else { + columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY) + } + } + } + } + + if (!columnCardinality.isEmpty) { + val latestColumnCardinality = ArrayUtils.toPrimitive(columnCardinality.toArray( + new Array[Integer](columnCardinality.size))) + val latestColumnPartitioner = ArrayUtils.toPrimitive(columnPartitioner.toArray( + new Array[Integer](columnPartitioner.size))) + val dimensionBitLength = CarbonUtil.getDimensionBitLength( + latestColumnCardinality, latestColumnPartitioner) + this.dimensionKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength) + } + } override def initialize(carbonColumns: Array[CarbonColumn], carbonTable: CarbonTable): Unit = { - this.columnarSplitter = segmentProperties.getFixedLengthKeySplitter + val dictIndexColumns = new ListBuffer[CarbonColumn]() + + // prepare index info to extract data from query result indexColumns.foreach { col => if (col.isDimension) { val dim = carbonTable.getDimensionByName(carbonTable.getTableName, col.getColName) if (!dim.isGlobalDictionaryEncoding && !dim.isDirectDictionaryEncoding) { indexCol2IdxInNoDictArray = indexCol2IdxInNoDictArray + (col.getColName -> indexCol2IdxInNoDictArray.size) + } else { + dictIndexColumns.append(col) + indexCol2IdxInDictArray = + indexCol2IdxInDictArray + (col.getColName -> indexCol2IdxInDictArray.size) } } else { indexCol2IdxInMeasureArray = indexCol2IdxInMeasureArray + (col.getColName -> indexCol2IdxInMeasureArray.size) } } - dictIndexCol2MdkIndex = segmentProperties.getDimensions.asScala - .filter(col => col.isGlobalDictionaryEncoding || col.isDirectDictionaryEncoding) - .map(_.getColName) - .zipWithIndex - .filter(p => indexColumns.exists(c => c.getColName.equalsIgnoreCase(p._1))) - .toMap - mdkIndex2DictIndexCol = dictIndexCol2MdkIndex.map(p => (p._2, p._1)) - existDim = indexCol2IdxInNoDictArray.nonEmpty || dictIndexCol2MdkIndex.nonEmpty + + if (dictIndexColumns.size > 0) { + prepareKeyGenForDictIndexColumns(carbonTable, dictIndexColumns) + } } /** * input: all the dimensions are bundled in one ByteArrayWrapper in position 0, - * then comes the measures one by one; + * then comes the measures one by one; last 3 elements are block/page/row id * output: all the dimensions and measures comes one after another */ override def readRow(data: Array[Object]): Array[Object] = { - val dictArray = if (existDim) { - val dictKeys = data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey - // note that the index column may only contains a portion of all the dict columns, so we - // need to pad fake bytes to dict keys in order to reconstruct value later - if (columnarSplitter.getBlockKeySize.length > dictIndexCol2MdkIndex.size) { - val res = new Array[Byte](columnarSplitter.getBlockKeySize.sum) - var startPos = 0 - var desPos = 0 - columnarSplitter.getBlockKeySize.indices.foreach { idx => - if (mdkIndex2DictIndexCol.contains(idx)) { - val size = columnarSplitter.getBlockKeySize.apply(idx) - System.arraycopy(dictKeys, startPos, res, desPos, size) - startPos += size - } - desPos += columnarSplitter.getBlockKeySize.apply(idx) - } - Option(res) - } else { - Option(dictKeys) - } - } else { - None + var surrogatKeys = new Array[Long](0) + if(null != dimensionKeyGenerator) { + surrogatKeys = dimensionKeyGenerator.getKeyArray( + data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey) } - val dictKeys = if (existDim) { - Option(columnarSplitter.splitKey(dictArray.get)) - } else { - None - } + // fill return row from data val rtn = new Array[Object](indexColumns.length + 3) - indexColumns.zipWithIndex.foreach { case (col, i) => - rtn(i) = if (dictIndexCol2MdkIndex.contains(col.getColName)) { - dictKeys.get(dictIndexCol2MdkIndex.get(col.getColName).get) + rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) { + surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer] } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) { data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( - indexCol2IdxInNoDictArray.apply(col.getColName)) + indexCol2IdxInNoDictArray(col.getColName)) } else { // measures start from 1 - data(1 + indexCol2IdxInMeasureArray.apply(col.getColName)) + data(1 + indexCol2IdxInMeasureArray(col.getColName)) } } rtn(indexColumns.length) = data(data.length - 3) http://git-wip-us.apache.org/repos/asf/carbondata/blob/81038f55/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index df5ee18..0b0c665 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -512,6 +512,102 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with "BloomFilter datamap does not support complex datatype column")) } + test("test create bloom datamap on newly added column") { + val datamap1 = "datamap1" + val datamap2 = "datamap2" + val datamap3 = "datamap3" + + // create a table with dict/noDict/measure column + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', + | 'DICTIONARY_INCLUDE'='s1,s2') + | """.stripMargin) + + // load data into table (segment0) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + // create simple datamap on segment0 + sql( + s""" + | CREATE DATAMAP $datamap1 ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='id', 'BLOOM_SIZE'='640000') + """.stripMargin) + + // add some columns including dict/noDict/measure column + sql( + s""" + | ALTER TABLE $bloomDMSampleTable + | ADD COLUMNS(num1 INT, dictString STRING, noDictString STRING) + | TBLPROPERTIES('DEFAULT.VALUE.num1'='999', 'DEFAULT.VALUE.dictString'='old', + | 'DICTIONARY_INCLUDE'='dictString' + | ) + """.stripMargin) + + // load data into table (segment1) + sql( + s""" + | INSERT INTO TABLE $bloomDMSampleTable VALUES + | (100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',0,'S01','S02'), + | (101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',4,'S11','S12'), + | (102,'name2','city2',12,'s12','s22','s32','s42','s52','s62','s72','s82',5,'S21','S22') + """.stripMargin) + + // check data after columns added + var res = sql( + s""" + | SELECT name, city, num1, dictString, noDictString + | FROM $bloomDMSampleTable + | WHERE id = 101 + | """.stripMargin) + checkExistence(res, true, "999", "null") + + // create datamap on newly added column + sql( + s""" + | CREATE DATAMAP $datamap2 ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='s1,dictString,s8,noDictString,age,num1', + | 'BLOOM_SIZE'='640000') + """.stripMargin) + + // load data into table (segment2) + sql( + s""" + | INSERT INTO TABLE $bloomDMSampleTable VALUES + | (100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',1,'S01','S02'), + | (101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',2,'S11','S12'), + | (102,'name2','city1',12,'s12','s22','s32','s42','s52','s62','s72','s82',3,'S21','S22') + """.stripMargin) + + var explainString = sql( + s""" + | explain SELECT id, name, num1, dictString + | FROM $bloomDMSampleTable + | WHERE num1 = 1 + """.stripMargin).collect() + assert(explainString(0).getString(0).contains( + "- name: datamap2\n - provider: bloomfilter\n - skipped blocklets: 1")) + + explainString = sql( + s""" + | explain SELECT id, name, num1, dictString + | FROM $bloomDMSampleTable + | WHERE dictString = 'S21' + """.stripMargin).collect() + assert(explainString(0).getString(0).contains( + "- name: datamap2\n - provider: bloomfilter\n - skipped blocklets: 0")) + + } + + override protected def afterAll(): Unit = { deleteFile(bigFile) deleteFile(smallFile)