http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 78cfc36..41af792 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -48,6 +48,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; @@ -59,6 +60,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSche import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.FileHolder; import org.apache.carbondata.core.datastorage.store.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer; import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel; @@ -890,6 +892,31 @@ public final class CarbonUtil { } /** + * The method calculate the B-Tree metadata size. + * @param filePath + * @param blockOffset + * @param blockLength + * @return + */ + public static long calculateMetaSize(String filePath, long blockOffset, long blockLength) { + FileHolder fileReader = null; + try { + long completeBlockLength = blockOffset + blockLength; + long footerPointer = completeBlockLength - 8; + fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)); + long actualFooterOffset = fileReader.readLong(filePath, footerPointer); + long size = footerPointer - actualFooterOffset; + return size; + } + finally { + if(null != fileReader) { + fileReader.finish(); + } + } + } + + + /** * Below method will be used to get the surrogate key * * @param data actual data @@ -909,6 +936,44 @@ public final class CarbonUtil { } /** + * The method returns the B-Tree for a particular taskId + * + * @param taskId + * @param tableBlockInfoList + * @param absoluteTableIdentifier + */ + public static long calculateDriverBTreeSize(String taskId, String bucketNumber, + List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) { + // need to sort the block info list based for task in ascending order so + // it will be sinkup with block index read from file + Collections.sort(tableBlockInfoList); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + // geting the index file path + //TODO need to pass proper partition number when partiton will be supported + String carbonIndexFilePath = carbonTablePath + .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + bucketNumber); + CarbonFile carbonFile = FileFactory + .getCarbonFile(carbonIndexFilePath, FileFactory.getFileType(carbonIndexFilePath)); + // in case of carbonIndex file whole file is meta only so reading complete file. + return carbonFile.getSize(); + } + + /** + * This method will clear the B-Tree Cache in executors for the given list of blocks + * + * @param dataBlocks + */ + public static void clearBlockCache(List<AbstractIndex> dataBlocks) { + if (null != dataBlocks) { + for (AbstractIndex blocks : dataBlocks) { + blocks.clear(); + } + } + } + /** * Thread to delete the tables * */
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java index f6df175..0ceb80f 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java @@ -30,11 +30,15 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.common.logging.impl.StandardLogService; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; @@ -47,6 +51,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.scan.executor.QueryExecutor; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.executor.infos.AggregatorInfo; @@ -106,15 +111,19 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // query execution Collections.sort(queryModel.getTableBlockInfos()); // get the table blocks - BlockIndexStore blockLoaderInstance = BlockIndexStore.getInstance(); + CacheProvider cacheProvider = CacheProvider.getInstance(); + BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache = + (BlockIndexStore) cacheProvider + .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath()); // remove the invalid table blocks, block which is deleted or compacted - blockLoaderInstance.removeTableBlocks(queryModel.getInvalidSegmentIds(), + cache.removeTableBlocks(queryModel.getInvalidSegmentIds(), queryModel.getAbsoluteTableIdentifier()); try { - queryProperties.dataBlocks = blockLoaderInstance - .loadAndGetBlocks(queryModel.getTableBlockInfos(), + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = + prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(), queryModel.getAbsoluteTableIdentifier()); - } catch (IndexBuilderException e) { + queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers); + } catch (CarbonUtilException e) { throw new QueryExecutionException(e); } queryStatistic @@ -170,6 +179,17 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryProperties.sortDimIndexes = new byte[queryModel.getQueryDimension().size()]; } + private List<TableBlockUniqueIdentifier> prepareTableBlockUniqueIdentifier( + List<TableBlockInfo> tableBlockInfos, AbsoluteTableIdentifier absoluteTableIdentifier) { + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = + new ArrayList<>(tableBlockInfos.size()); + for (TableBlockInfo blockInfo : tableBlockInfos) { + tableBlockUniqueIdentifiers + .add(new TableBlockUniqueIdentifier(absoluteTableIdentifier, blockInfo)); + } + return tableBlockUniqueIdentifiers; + } + /** * Below method will be used to get the key structure info for the query * @@ -470,6 +490,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { * @throws QueryExecutionException */ @Override public void finish() throws QueryExecutionException { + CarbonUtil.clearBlockCache(queryProperties.dataBlocks); if (null != queryProperties.executorService) { queryProperties.executorService.shutdown(); try { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java index c6e9ff7..ff7a93c 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -59,6 +59,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { * execution info of the block */ protected List<BlockExecutionInfo> blockExecutionInfos; + /** * file reader which will be used to execute the query */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java index 7c5bd44..52cec7d 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java @@ -40,7 +40,7 @@ public class CacheProviderTest { @Before public void setUp() throws Exception { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); } @Test public void getInstance() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java index 559f653..953e24f 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java @@ -70,7 +70,7 @@ public class ForwardDictionaryCacheTest extends AbstractDictionaryCacheTest { private void createDictionaryCacheObject() { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); CacheProvider cacheProvider = CacheProvider.getInstance(); forwardDictionaryCache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, this.carbonStorePath); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java index 3d817b6..c526f45 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java @@ -72,7 +72,7 @@ public class ReverseDictionaryCacheTest extends AbstractDictionaryCacheTest { private void createDictionaryCacheObject() { // enable lru cache by setting cache size CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE, "10"); + .addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); CacheProvider cacheProvider = CacheProvider.getInstance(); cacheProvider.dropAllCache(); reverseDictionaryCache = http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java index 722d030..3490917 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java @@ -24,19 +24,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnarFormatVersion; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndex; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo; import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import mockit.Mock; import mockit.MockUp; @@ -56,7 +59,8 @@ public class SegmentTaskIndexStoreTest { private static AbsoluteTableIdentifier absoluteTableIdentifier; @BeforeClass public static void setUp() { - taskIndexStore = SegmentTaskIndexStore.getInstance(); + CacheProvider cacheProvider = CacheProvider.getInstance(); + taskIndexStore = (SegmentTaskIndexStore) cacheProvider.createCache(CacheType.DRIVER_BTREE, ""); tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L, ColumnarFormatVersion.valueOf(version)); absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp", @@ -81,7 +85,7 @@ public class SegmentTaskIndexStoreTest { return footerList; } - @Test public void loadAndGetTaskIdToSegmentsMap() throws IndexBuilderException { + @Test public void loadAndGetTaskIdToSegmentsMap() throws CarbonUtilException { new MockUp<CarbonTablePath.DataFileUtil>() { @Mock String getTaskNo(String carbonDataFileName) { return "100"; @@ -97,23 +101,40 @@ public class SegmentTaskIndexStoreTest { } }; + new MockUp<CarbonTablePath>() { + @Mock public String getCarbonIndexFilePath(final String taskId, final String partitionId, + final String segmentId, final String bucketNumber) { + return "/src/test/resources"; + } + }; + new MockUp<SegmentTaskIndex>() { @Mock void buildIndex(List<DataFileFooter> footerList) { } }; + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "SG100"); - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result = - taskIndexStore.loadAndGetTaskIdToSegmentsMap(new HashMap<String, List<TableBlockInfo>>() {{ + HashMap<String, List<TableBlockInfo>> segmentToTableBlocksInfos = + new HashMap<String, List<TableBlockInfo>>() {{ put("SG100", Arrays.asList(tableBlockInfo)); - }}, absoluteTableIdentifier); + }}; + tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result = + taskIndexStore.get(tableSegmentUniqueIdentifier).getTaskIdToTableSegmentMap(); assertEquals(result.size(), 1); assertTrue(result.containsKey(new SegmentTaskIndexStore.TaskBucketHolder("100", "0"))); } @Test public void checkExistenceOfSegmentBTree() { - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result = - taskIndexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "SG100"); + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "SG100"); + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + taskIndexStore.getIfPresent(tableSegmentUniqueIdentifier); + Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result = segmentTaskIndexWrapper != null ? + segmentTaskIndexWrapper.getTaskIdToTableSegmentMap() : + null; assertNull(result); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java index 6d90a36..5e389ea 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java @@ -29,13 +29,13 @@ public class BlockInfoTest { static BlockInfo blockInfo; @BeforeClass public static void setup() { - blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); } @Test public void hashCodeTest() { int res = blockInfo.hashCode(); - int expectedResult = -520590451; - assertEquals(res, expectedResult); + int expectedResult = 1694768249; + assertEquals(expectedResult, res); } @Test public void equalsTestwithSameObject() { @@ -45,7 +45,7 @@ public class BlockInfoTest { @Test public void equalsTestWithSimilarObject() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (res); } @@ -62,28 +62,28 @@ public class BlockInfoTest { @Test public void equalsTestWithDifferentSegmentId() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentOffset() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentBlockLength() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDiffFilePath() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); Boolean res = blockInfoTest.equals(blockInfo); assert (!res); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 14d9248..4eed732 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -210,7 +210,7 @@ public class CarbonUtilTest { } }; String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath"); - assertEquals(badLogStoreLocation, "../unibi-solutions/system/carbon/badRecords/badLogPath"); + assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath"); } @Test public void testToDeleteFoldersAndFilesForCarbonFileSilently() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java index b7c48d7..38c85fe 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java @@ -153,7 +153,7 @@ public class DataFileFooterConverterTest { } }; String[] arr = { "a", "b", "c" }; - TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfo = new TableBlockInfo("/file.carbondata", 3, "id", arr, 3, ColumnarFormatVersion.V1); tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3); List<TableBlockInfo> tableBlockInfoList = new ArrayList<>(); tableBlockInfoList.add(tableBlockInfo); @@ -255,7 +255,7 @@ public class DataFileFooterConverterTest { segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols()); dataFileFooter.setNumberOfRows(3); dataFileFooter.setSegmentInfo(segmentInfo); - TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); + TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info); assertEquals(result.getNumberOfRows(), 3); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java new file mode 100644 index 0000000..cbd3511 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.hadoop; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtilException; + +/** + * CacheClient : Class used to request the segments cache + */ +public class CacheClient { + /** + * List of segments + */ + private List<TableSegmentUniqueIdentifier> segmentList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + /** + * absolute table identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private SegmentTaskIndexStore segmentCache; + + /** + * @param absoluteTableIdentifier + */ + public CacheClient(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + segmentCache = (SegmentTaskIndexStore) CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath()); + } + + /** + * The method returns the SegmentTaskIndexWrapper from the segments cache + * + * @param tableSegmentUniqueIdentifier + * @return + * @throws CarbonUtilException + */ + public SegmentTaskIndexWrapper getSegmentTaskIndexWrapper( + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws CarbonUtilException { + SegmentTaskIndexWrapper segmentTaskIndexWrapper; + if (null == tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos()) { + segmentTaskIndexWrapper = segmentCache.getIfPresent(tableSegmentUniqueIdentifier); + } else { + segmentTaskIndexWrapper = segmentCache.get(tableSegmentUniqueIdentifier); + } + if (null != segmentTaskIndexWrapper) { + segmentList.add(tableSegmentUniqueIdentifier); + } + return segmentTaskIndexWrapper; + } + + /** + * the method is used to clear access count of the unused segments cacheable object + */ + public void close() { + segmentCache.clear(segmentList); + segmentCache =null; + } + + /** + * The method removes invalid segments from the segment level cache + * + * @param invalidSegments + */ + public void removeInvalidSegments(List<String> invalidSegments) { + segmentCache.removeSegments(invalidSegments, absoluteTableIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index b69df86..2e92842 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -32,9 +32,11 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNode; import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; @@ -49,6 +51,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; @@ -208,6 +211,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + CacheClient cacheClient = new CacheClient(identifier); List<String> invalidSegments = new ArrayList<>(); // get all valid segments and set them into the configuration @@ -222,7 +226,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // remove entry in the segment index if there are invalid segments invalidSegments.addAll(segments.getInvalidSegments()); if (invalidSegments.size() > 0) { - SegmentTaskIndexStore.getInstance().removeTableBlocks(invalidSegments, identifier); + cacheClient.removeInvalidSegments(invalidSegments); } } @@ -234,10 +238,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { List<InputSplit> splits; try { // do block filtering and get split - splits = getSplits(job, filterInterface); - } catch (IndexBuilderException e) { + splits = getSplits(job, filterInterface, cacheClient); + } catch (IndexBuilderException | CarbonUtilException e) { throw new IOException(e); } + finally { + cacheClient.close(); + cacheClient = null; + } // pass the invalid segment to task side in order to remove index entry in task side if (invalidSegments.size() > 0) { for (InputSplit split : splits) { @@ -272,8 +280,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * @return * @throws IOException */ - private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver) - throws IOException, IndexBuilderException { + private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, + CacheClient cacheClient) throws IOException, IndexBuilderException, CarbonUtilException { List<InputSplit> result = new LinkedList<InputSplit>(); @@ -281,12 +289,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(job.getConfiguration()); - //for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : getSegmentsToAccess(job)) { List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, - filterResolver, segmentNo); + filterResolver, segmentNo, cacheClient); for (DataRefNode dataRefNode : dataRefNodes) { BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); @@ -318,11 +325,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { private List<DataRefNode> getDataBlocksOfSegment(JobContext job, FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, - String segmentId) throws IndexBuilderException, IOException { + String segmentId, CacheClient cacheClient) + throws IndexBuilderException, IOException, CarbonUtilException { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = - getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId); + getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient); List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>(); @@ -391,23 +399,31 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * @throws IndexBuilderException */ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs( - JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) - throws IOException, IndexBuilderException { - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = - SegmentTaskIndexStore.getInstance() - .getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId); + JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId, + CacheClient cacheClient) throws IOException, IndexBuilderException, CarbonUtilException { + Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + if (null != segmentTaskIndexWrapper) { + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } // if segment tree is not loaded, load the segment tree if (segmentIndexMap == null) { + // List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId); + // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); // get Btree blocks for given segment - segmentIndexMap = SegmentTaskIndexStore.getInstance() - .loadAndGetTaskIdToSegmentsMap(segmentToTableBlocksInfos, absoluteTableIdentifier); - + tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(tableSegmentUniqueIdentifier); + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); } return segmentIndexMap; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index 82bcf1c..dd16e57 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -31,9 +31,11 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNode; import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore; +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; @@ -43,6 +45,8 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstant import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtilException; +import org.apache.carbondata.hadoop.CacheClient; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.internal.index.Block; import org.apache.carbondata.hadoop.internal.index.Index; @@ -103,20 +107,33 @@ class InMemoryBTreeIndex implements Index { private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs( JobContext job, AbsoluteTableIdentifier identifier) throws IOException, IndexBuilderException { - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = - SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(identifier, segment.getId()); - - // if segment tree is not loaded, load the segment tree - if (segmentIndexMap == null) { - List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job); - Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); - segmentToTableBlocksInfos.put(segment.getId(), tableBlockInfoList); - - // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input - // get Btree blocks for given segment - segmentIndexMap = SegmentTaskIndexStore.getInstance() - .loadAndGetTaskIdToSegmentsMap(segmentToTableBlocksInfos, identifier); - + Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; + CacheClient cacheClient = new CacheClient(identifier); + TableSegmentUniqueIdentifier segmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(identifier, segment.getId()); + try { + SegmentTaskIndexWrapper segmentTaskIndexWrapper = + cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + if (null != segmentTaskIndexWrapper) { + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } + // if segment tree is not loaded, load the segment tree + if (segmentIndexMap == null) { + List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job); + Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); + segmentToTableBlocksInfos.put(segment.getId(), tableBlockInfoList); + segmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input + // get Btree blocks for given segment + segmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentUniqueIdentifier); + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } + } + catch (CarbonUtilException e) { + throw new IndexBuilderException(e.getMessage(), e); + } + finally { + cacheClient.close(); } return segmentIndexMap; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 99dc853..e740caa 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -134,9 +134,6 @@ class CarbonMergerRDD[K, V]( result2 = exec.processTableBlocks() } catch { case e: Throwable => - if (null != exec) { - exec.finish() - } LOGGER.error(e) if (null != e.getMessage) { sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index ec6f456..f7f0802 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.hive.CarbonMetastore +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport import org.apache.carbondata.spark.rdd.SparkReadSupport @@ -40,6 +42,7 @@ object CarbonEnv { val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "") carbonEnv = CarbonEnv(catalog) initialized = true + CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index a1664a6..451b95d 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -20,19 +20,20 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File -import org.apache.carbondata.core.carbon.path.CarbonStorePath -import org.apache.spark.sql.Row +import scala.collection.JavaConverters._ + import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier +import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper +import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.load.LoadMetadataDetails import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status +import org.apache.carbondata.hadoop.CacheClient import org.apache.carbondata.lcm.status.SegmentStatusManager -import org.scalatest.BeforeAndAfterAll - -import scala.collection.JavaConverters._ /** * FT for compaction scenario where major segment should not be included in minor. @@ -127,7 +128,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll test("delete merged folder and check segments") { // delete merged segments sql("clean files for table ignoremajor") - + sql("select * from ignoremajor").show() val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( @@ -140,6 +141,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll assert(segments.contains("2.1")) assert(!segments.contains("2")) assert(!segments.contains("3")) + val cacheClient = new CacheClient(identifier); + val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2") + val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentTaskIndexWrapper(segmentIdentifier) + assert(null == wrapper) } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index 328e33b..136a8f3 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -21,6 +21,7 @@ package org.apache.carbondata.carbon.datastore; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -28,13 +29,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnarFormatVersion; import org.apache.carbondata.core.carbon.datastore.BlockIndexStore; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.test.util.StoreCreator; @@ -47,7 +53,8 @@ import org.junit.Test; public class BlockIndexStoreTest extends TestCase { - private BlockIndexStore indexStore; + // private BlockIndexStore indexStore; + BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache; private String property; @BeforeClass public void setUp() { @@ -55,7 +62,10 @@ public class BlockIndexStoreTest extends TestCase { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1"); StoreCreator.createCarbonStore(); - indexStore = BlockIndexStore.getInstance(); + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10"); + CacheProvider cacheProvider = CacheProvider.getInstance(); + cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE, ""); } @AfterClass public void tearDown() { @@ -66,7 +76,8 @@ public class BlockIndexStoreTest extends TestCase { } } - @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException { + @Test public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment() + throws IOException, CarbonUtilException { String canonicalPath = new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); File file = getPartFile(); @@ -78,13 +89,26 @@ public class BlockIndexStoreTest extends TestCase { AbsoluteTableIdentifier absoluteTableIdentifier = new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); try { - List<AbstractIndex> loadAndGetBlocks = indexStore - .loadAndGetBlocks(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); + + List<TableBlockUniqueIdentifier> tableBlockInfoList = + getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); + List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList); assertTrue(loadAndGetBlocks.size() == 1); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List<String> segmentIds = new ArrayList<>(); + segmentIds.add(info.getSegmentId()); + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); + } + + private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos, + AbsoluteTableIdentifier absoluteTableIdentifier) { + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + tableBlockUniqueIdentifiers.add(new TableBlockUniqueIdentifier(absoluteTableIdentifier, tableBlockInfo)); + } + return tableBlockUniqueIdentifiers; } @Test public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently() @@ -130,16 +154,21 @@ public class BlockIndexStoreTest extends TestCase { } catch (InterruptedException e) { e.printStackTrace(); } - + List<TableBlockInfo> tableBlockInfos = + Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }); try { - List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks( - Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }), - absoluteTableIdentifier); + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = + getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); + List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers); assertTrue(loadAndGetBlocks.size() == 5); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List<String> segmentIds = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + segmentIds.add(tableBlockInfo.getSegmentId()); + } + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); } @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently() @@ -197,15 +226,21 @@ public class BlockIndexStoreTest extends TestCase { // TODO Auto-generated catch block e.printStackTrace(); } + List<TableBlockInfo> tableBlockInfos = Arrays + .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }); try { - List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks(Arrays - .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }), - absoluteTableIdentifier); + List<TableBlockUniqueIdentifier> blockUniqueIdentifierList = + getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); + List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList); assertTrue(loadAndGetBlocks.size() == 8); - } catch (IndexBuilderException e) { + } catch (CarbonUtilException e) { assertTrue(false); } - indexStore.clear(absoluteTableIdentifier); + List<String> segmentIds = new ArrayList<>(); + for (TableBlockInfo tableBlockInfo : tableBlockInfos) { + segmentIds.add(tableBlockInfo.getSegmentId()); + } + cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); } private class BlockLoaderThread implements Callable<Void> { @@ -214,13 +249,14 @@ public class BlockIndexStoreTest extends TestCase { public BlockLoaderThread(List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) { - // TODO Auto-generated constructor stub this.tableBlockInfoList = tableBlockInfoList; this.absoluteTableIdentifier = absoluteTableIdentifier; } @Override public Void call() throws Exception { - indexStore.loadAndGetBlocks(tableBlockInfoList, absoluteTableIdentifier); + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifierList = + getTableBlockUniqueIdentifierList(tableBlockInfoList, absoluteTableIdentifier); + cache.getAll(tableBlockUniqueIdentifierList); return null; }