[HotFix][CARBONDATA-2788][BloomDataMap] Fix bugs in incorrect query result with bloom datamap
This PR solve two problems which will affect the correctness of the query on bloom. Revert PR2539 After review the code, we found that modification in PR2539 is not needed, so we revert that PR. Bugs in overflow for blocklet count Carbondata stores blocklet count for each block in byte data type, when a block contains more than 128 blocklets, it will overflow the byte limits. Here we change the data type to short. For cache_leve=block, after pruning by main BlockDataMap, the blockletNo in Blocklet is -1, which indicate that the following percedure will scan the whole block -- all the blocklets in the block. So, when doing intersection with the pruned result from BloomDataMap, we need to take care of these blocklets. In this implementation, we added the result from BloomDataMap based on the blocklet's existence in BlockDataMap. This closes #2565 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1cea4d33 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1cea4d33 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1cea4d33 Branch: refs/heads/external-format Commit: 1cea4d33ff9096fab5d38a1403e1e78c2fa2d6dc Parents: 34ca021 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Thu Jul 26 23:22:58 2018 +0800 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Wed Aug 1 10:40:07 2018 +0530 ---------------------------------------------------------------------- .../indexstore/blockletindex/BlockDataMap.java | 24 +++++++--- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../hadoop/api/CarbonInputFormat.java | 28 ++++++++++-- .../lucene/LuceneFineGrainDataMapSuite.scala | 14 +++--- .../datamap/IndexDataMapRebuildRDD.scala | 10 ++++- .../BloomCoarseGrainDataMapFunctionSuite.scala | 46 +++++++++++++++++++- .../bloom/BloomCoarseGrainDataMapSuite.scala | 2 +- 7 files changed, 104 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 82006c3..f4bb58e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.indexstore.blockletindex; import java.io.*; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; import java.util.List; @@ -58,7 +59,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; @@ -248,8 +248,8 @@ public class BlockDataMap extends CoarseGrainDataMap byte[][] blockMinValues = null; byte[][] blockMaxValues = null; DataMapRowImpl summaryRow = null; - List<Byte> blockletCountInEachBlock = new ArrayList<>(indexInfo.size()); - byte totalBlockletsInOneBlock = 0; + List<Short> blockletCountInEachBlock = new ArrayList<>(indexInfo.size()); + short totalBlockletsInOneBlock = 0; boolean isLastFileFooterEntryNeedToBeAdded = false; CarbonRowSchema[] schema = getFileFooterEntrySchema(); for (DataFileFooter fileFooter : indexInfo) { @@ -318,13 +318,22 @@ public class BlockDataMap extends CoarseGrainDataMap blockMinValues, blockMaxValues); blockletCountInEachBlock.add(totalBlockletsInOneBlock); } - byte[] blockletCount = ArrayUtils - .toPrimitive(blockletCountInEachBlock.toArray(new Byte[blockletCountInEachBlock.size()])); + byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock); // blocklet count index is the last index summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1); return summaryRow; } + private byte[] convertRowCountFromShortToByteArray(List<Short> blockletCountInEachBlock) { + int bufferSize = blockletCountInEachBlock.size() * 2; + ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); + for (Short blockletCount : blockletCountInEachBlock) { + byteBuffer.putShort(blockletCount); + } + byteBuffer.rewind(); + return byteBuffer.array(); + } + protected void setLocations(String[] locations, DataMapRow row, int ordinal) throws UnsupportedEncodingException { // Add location info @@ -696,7 +705,7 @@ public class BlockDataMap extends CoarseGrainDataMap relativeBlockletId = (short) absoluteBlockletId; } else { int diff = absoluteBlockletId; - byte[] blockletRowCountForEachBlock = getBlockletRowCountForEachBlock(); + ByteBuffer byteBuffer = ByteBuffer.wrap(getBlockletRowCountForEachBlock()); // Example: absoluteBlockletID = 17, blockletRowCountForEachBlock = {4,3,2,5,7} // step1: diff = 17-4, diff = 13 // step2: diff = 13-3, diff = 10 @@ -704,7 +713,8 @@ public class BlockDataMap extends CoarseGrainDataMap // step4: diff = 8-5, diff = 3 // step5: diff = 3-7, diff = -4 (satisfies <= 0) // step6: relativeBlockletId = -4+7, relativeBlockletId = 3 (4th index starting from 0) - for (byte blockletCount : blockletRowCountForEachBlock) { + while (byteBuffer.hasRemaining()) { + short blockletCount = byteBuffer.getShort(); diff = diff - blockletCount; if (diff < 0) { relativeBlockletId = (short) (diff + blockletCount); http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 643cc45..4dd78ee 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -71,7 +71,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory /** * variable for cache level BLOCKLET */ - private static final String CACHE_LEVEL_BLOCKLET = "BLOCKLET"; + public static final String CACHE_LEVEL_BLOCKLET = "BLOCKLET"; public static final DataMapSchema DATA_MAP_SCHEMA = new DataMapSchema(NAME, BlockletDataMapFactory.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 356dd5a..eeb3ae8 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapWrapperSimpleInfo; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -53,6 +54,7 @@ import org.apache.carbondata.core.scan.model.QueryModelBuilder; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; @@ -462,8 +464,7 @@ m filterExpression } // since index datamap prune in segment scope, // the result need to intersect with previous pruned result - prunedBlocklets = (List) CollectionUtils.intersection( - cgPrunedBlocklets, prunedBlocklets); + prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets, cgPrunedBlocklets); ExplainCollector.recordCGDataMapPruning( DataMapWrapperSimpleInfo.fromDataMapWrapper(cgDataMapExprWrapper), prunedBlocklets.size()); @@ -482,8 +483,8 @@ m filterExpression resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune); // note that the 'fgPrunedBlocklets' has extra datamap related info compared with // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets' - prunedBlocklets = (List) CollectionUtils.intersection(fgPrunedBlocklets, - prunedBlocklets); + prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets, + fgPrunedBlocklets); ExplainCollector.recordFGDataMapPruning( DataMapWrapperSimpleInfo.fromDataMapWrapper(fgDataMapExprWrapper), prunedBlocklets.size()); @@ -492,6 +493,25 @@ m filterExpression return prunedBlocklets; } + private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable, + List<ExtendedBlocklet> previousDataMapPrunedBlocklets, + List<ExtendedBlocklet> otherDataMapPrunedBlocklets) { + List<ExtendedBlocklet> prunedBlocklets = null; + if (BlockletDataMapUtil.isCacheLevelBlock( + carbonTable, BlockletDataMapFactory.CACHE_LEVEL_BLOCKLET)) { + prunedBlocklets = new ArrayList<>(); + for (ExtendedBlocklet otherBlocklet : otherDataMapPrunedBlocklets) { + if (previousDataMapPrunedBlocklets.contains(otherBlocklet)) { + prunedBlocklets.add(otherBlocklet); + } + } + } else { + prunedBlocklets = (List) CollectionUtils + .intersection(otherDataMapPrunedBlocklets, previousDataMapPrunedBlocklets); + } + return prunedBlocklets; + } + /** * Prune the segments from the already pruned blocklets. * @param segments http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index a380f04..54cad00 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -371,7 +371,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { """ | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'CACHE_LEVEL'='BLOCKLET') """.stripMargin) sql( s""" @@ -571,7 +571,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { """ | CREATE TABLE main(id INT, name STRING, city STRING, age INT) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'CACHE_LEVEL'='BLOCKLET') """.stripMargin) sql( s""" @@ -664,7 +664,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { """ | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 'CACHE_LEVEL'='BLOCKLET') """.stripMargin) sql( s""" @@ -803,7 +803,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql("drop table if exists table_stop") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS, "false") - sql("create table table_stop(suggestion string,goal string) stored by 'carbondata'") + sql("create table table_stop(suggestion string,goal string) stored by 'carbondata' TBLPROPERTIES('CACHE_LEVEL'='BLOCKLET')") sql( "create datamap stop_dm on table table_stop using 'lucene' DMPROPERTIES('index_columns'='suggestion')") sql("insert into table_stop select 'The is the stop word','abcde'") @@ -821,13 +821,15 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { """ | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 'autorefreshdatamap' = 'false') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', + | 'autorefreshdatamap' = 'false', 'CACHE_LEVEL'='BLOCKLET') """.stripMargin) sql( """ | CREATE TABLE datamap_copy(id INT, name STRING, city STRING, age INT) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 'autorefreshdatamap' = 'false') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', + | 'autorefreshdatamap' = 'false', 'CACHE_LEVEL'='BLOCKLET') """.stripMargin) sql("insert into datamap_test4 select 1,'name','city',20") sql("insert into datamap_test4 select 2,'name1','city1',20") http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index e4d5b26..2d684bf 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -357,13 +357,21 @@ class IndexDataMapRebuildRDD[K, V]( // skip clear datamap and we will do this adter rebuild reader.setSkipClearDataMapAtClose(true) + // Note that blockletId in rowWithPosition does not work properly, + // here we use another way to generate it. + var blockletId = 0 + var firstRow = true while (reader.nextKeyValue()) { val rowWithPosition = reader.getCurrentValue val size = rowWithPosition.length - val blockletId = rowWithPosition(size - 3).asInstanceOf[Int] val pageId = rowWithPosition(size - 2).asInstanceOf[Int] val rowId = rowWithPosition(size - 1).asInstanceOf[Int] + if (!firstRow && pageId == 0 && rowId == 0) { + blockletId = blockletId + 1 + } else { + firstRow = false + } refresher.addRow(blockletId, pageId, rowId, rowWithPosition) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala index 92943b2..496a506 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala @@ -1,14 +1,15 @@ package org.apache.carbondata.datamap.bloom import java.io.File +import java.util.{Random, UUID} import org.apache.commons.io.FileUtils -import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.{CarbonEnv, SaveMode} import org.apache.spark.sql.test.Spark2TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -790,6 +791,47 @@ class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfte assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindex"), true).asScala.nonEmpty) } + // two blocklets in one block are hit by bloom datamap while block cache level hit this block + test("CARBONDATA-2788: enable block cache level and bloom datamap") { + // minimum per page is 2000 rows + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "2000") + // minimum per blocklet is 16MB + CarbonProperties.getInstance().addProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, "16") + // these lines will result in 3 blocklets in one block and bloom will hit at least 2 of them + val lines = 100000 + sql("drop table if exists test_rcd").collect() + val r = new Random() + import sqlContext.implicits._ + val df = sqlContext.sparkContext.parallelize(1 to lines) + .map(x => ("No." + r.nextInt(10000), "country" + x % 10000, "city" + x % 10000, x % 10000, + UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, + UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, + UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, + UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString)) + .toDF("ID", "country", "city", "population", + "random1", "random2", "random3", + "random4", "random5", "random6", + "random7", "random8", "random9", + "random10", "random11", "random12") + df.write + .format("carbondata") + .option("tableName", "test_rcd") + .option("SORT_COLUMNS", "id") + .option("SORT_SCOPE", "LOCAL_SORT") + .mode(SaveMode.Overwrite) + .save() + + val withoutBloom = sql("select count(*) from test_rcd where city = 'city40'").collect().toSeq + sql("CREATE DATAMAP dm_rcd ON TABLE test_rcd " + + "USING 'bloomfilter' DMPROPERTIES " + + "('INDEX_COLUMNS' = 'city', 'BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')") + checkAnswer(sql("select count(*) from test_rcd where city = 'city40'"), withoutBloom) + + sql("drop table if exists test_rcd").collect() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL) + } + override def afterAll(): Unit = { deleteFile(bigFile) sql(s"DROP TABLE IF EXISTS $normalTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/1cea4d33/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index 7871518..1d57268 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -563,7 +563,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', - | 'DICTIONARY_INCLUDE'='s1,s2') + | 'DICTIONARY_INCLUDE'='s1,s2', 'CACHE_LEVEL'='BLOCKLET') | """.stripMargin) // load data into table (segment0)