[CARBONDATA-2557] [CARBONDATA-2472] [CARBONDATA-2570] Improve Carbon Reader performance on S3 and fixed datamap clear issue in reader
[CARBONDATA-2557] [CARBONDATA-2472] Problem : CarbonReaderBuilder.build() is slower in s3. It takes around 8 seconds to finish build() Solution: S3 is slow in listFiles, open, FileExist, getCarbonFile operations. So, List down all the calls of those API in the reader flow and remove the redundant checks. [CARBONDATA-2570] Problem : Carbon SDK Reader, second time reader instance have an issue in cluster test Solution: Blocklet datamap's of first time reader is not cleared properly in the cluster. Need to change the API to clear the blocklet datamap. so change DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear(); to DataMapStoreManager.getInstance().clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifie()); This closes #2345 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f68a792 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f68a792 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f68a792 Branch: refs/heads/carbonstore Commit: 5f68a792f2e83d15379740f715cf05d7ae9aaa05 Parents: 2f23486 Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Sun May 27 22:49:23 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jun 5 16:23:27 2018 +0530 ---------------------------------------------------------------------- .../core/datamap/dev/CacheableDataMap.java | 6 +- .../core/datastore/SegmentTaskIndexStore.java | 2 +- .../indexstore/BlockletDataMapIndexStore.java | 84 +++++++------ .../TableBlockIndexUniqueIdentifierWrapper.java | 52 ++++++++ .../blockletindex/BlockletDataMapFactory.java | 122 ++++++++----------- .../blockletindex/SegmentIndexFileStore.java | 15 +++ .../core/metadata/schema/table/CarbonTable.java | 60 ++++----- .../LatestFilesReadCommittedScope.java | 19 +-- .../SegmentUpdateStatusManager.java | 15 ++- .../core/util/BlockletDataMapUtil.java | 50 +++++++- .../apache/carbondata/core/util/CarbonUtil.java | 30 +++++ .../TestBlockletDataMapFactory.java | 13 +- docs/sdk-guide.md | 10 -- .../examples/sdk/CarbonReaderExample.java | 1 - .../carbondata/hadoop/CarbonRecordReader.java | 3 +- .../hadoop/api/CarbonFileInputFormat.java | 97 ++++----------- .../hadoop/api/CarbonInputFormat.java | 24 ++++ ...FileInputFormatWithExternalCarbonTable.scala | 2 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 2 +- .../TestNonTransactionalCarbonTable.scala | 11 +- .../sdk/file/CarbonReaderBuilder.java | 51 ++------ .../carbondata/sdk/file/CarbonReaderTest.java | 4 +- 22 files changed, 375 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java index dba0840..e292c60 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; -import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper; import org.apache.carbondata.core.memory.MemoryException; /** @@ -33,10 +33,10 @@ public interface CacheableDataMap { /** * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier * - * @param tableBlockIndexUniqueIdentifier + * @param tableBlockIndexUniqueIdentifierWrapper * @param blockletDataMapIndexWrapper */ - void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException; /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index d325f21..c642091 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -91,7 +91,7 @@ public class SegmentTaskIndexStore segmentTaskIndexWrapper = loadAndGetTaskIdToSegmentsMap( tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(), - CarbonTable.buildFromTablePath("name", "path", false), + CarbonTable.buildDummyTable("path"), tableSegmentUniqueIdentifier); } catch (IndexBuilderException e) { throw new IOException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index db49976..71a9b5a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil; * blocks */ public class BlockletDataMapIndexStore - implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> { + implements Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> { private static final LogService LOGGER = LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName()); /** @@ -68,8 +68,10 @@ public class BlockletDataMapIndexStore } @Override - public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier) + public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) throws IOException { + TableBlockIndexUniqueIdentifier identifier = + identifierWrapper.getTableBlockIndexUniqueIdentifier(); String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey); @@ -84,7 +86,7 @@ public class BlockletDataMapIndexStore // if the identifier is not a merge file we can directly load the datamaps if (identifier.getMergeIndexFileName() == null) { Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil - .getBlockMetaInfoMap(identifier, indexFileStore, filesRead, + .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, carbonDataFileBlockMetaInfoMapping); BlockletDataMap blockletDataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); @@ -96,9 +98,10 @@ public class BlockletDataMapIndexStore BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore); for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier : tableBlockIndexUniqueIdentifiers) { - Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil - .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead, - carbonDataFileBlockMetaInfoMapping); + Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil.getBlockMetaInfoMap( + new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier, + identifierWrapper.getCarbonTable()), indexFileStore, filesRead, + carbonDataFileBlockMetaInfoMapping); BlockletDataMap blockletDataMap = loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap); dataMaps.add(blockletDataMap); @@ -119,26 +122,28 @@ public class BlockletDataMapIndexStore return blockletDataMapIndexWrapper; } - @Override - public List<BlockletDataMapIndexWrapper> getAll( - List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { + @Override public List<BlockletDataMapIndexWrapper> getAll( + List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers) + throws IOException { List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); - List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>(); + List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = new ArrayList<>(); BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null; // Get the datamaps for each indexfile from cache. try { - for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) { - BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier); + for (TableBlockIndexUniqueIdentifierWrapper + identifierWrapper : tableSegmentUniqueIdentifiers) { + BlockletDataMapIndexWrapper dataMapIndexWrapper = + getIfPresent(identifierWrapper); if (dataMapIndexWrapper != null) { blockletDataMapIndexWrappers.add(dataMapIndexWrapper); } else { - missedIdentifiers.add(identifier); + missedIdentifiersWrapper.add(identifierWrapper); } } - if (missedIdentifiers.size() > 0) { - for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) { - blockletDataMapIndexWrapper = get(identifier); + if (missedIdentifiersWrapper.size() > 0) { + for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) { + blockletDataMapIndexWrapper = get(identifierWrapper); blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper); } } @@ -151,37 +156,40 @@ public class BlockletDataMapIndexStore } throw new IOException("Problem in loading segment blocks.", e); } + return blockletDataMapIndexWrappers; } /** * returns the SegmentTaskIndexWrapper * - * @param tableSegmentUniqueIdentifier + * @param tableSegmentUniqueIdentifierWrapper * @return */ - @Override - public BlockletDataMapIndexWrapper getIfPresent( - TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { + @Override public BlockletDataMapIndexWrapper getIfPresent( + TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) { return (BlockletDataMapIndexWrapper) lruCache.get( - tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier() + .getUniqueTableSegmentIdentifier()); } /** * method invalidate the segment cache for segment * - * @param tableSegmentUniqueIdentifier + * @param tableSegmentUniqueIdentifierWrapper */ - @Override - public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { - lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + @Override public void invalidate( + TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) { + lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier() + .getUniqueTableSegmentIdentifier()); } @Override - public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException { String uniqueTableSegmentIdentifier = - tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(); + tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier() + .getUniqueTableSegmentIdentifier(); Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); if (lock == null) { lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier); @@ -190,16 +198,16 @@ public class BlockletDataMapIndexStore // as in that case clearing unsafe memory need to be taken card. If at all datamap entry // in the cache need to be overwritten then use the invalidate interface // and then use the put interface - if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) { synchronized (lock) { - if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) { List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); try { for (BlockletDataMap blockletDataMap: dataMaps) { blockletDataMap.convertToUnsafeDMStore(); } - lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper, - wrapper.getMemorySize()); + lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier() + .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize()); } catch (Throwable e) { // clear all the memory acquired by data map in case of any failure for (DataMap blockletDataMap : dataMaps) { @@ -264,14 +272,14 @@ public class BlockletDataMapIndexStore /** * The method clears the access count of table segments * - * @param tableSegmentUniqueIdentifiers + * @param tableSegmentUniqueIdentifiersWrapper */ - @Override - public void clearAccessCount( - List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) { - for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) { - BlockletDataMap cacheable = - (BlockletDataMap) lruCache.get(identifier.getUniqueTableSegmentIdentifier()); + @Override public void clearAccessCount( + List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) { + for (TableBlockIndexUniqueIdentifierWrapper + identifierWrapper : tableSegmentUniqueIdentifiersWrapper) { + BlockletDataMap cacheable = (BlockletDataMap) lruCache.get( + identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier()); cacheable.clear(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java new file mode 100644 index 0000000..3411397 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; + +/** + * Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info + * This is just a wrapper passed between methods like a context, This object must never be cached. + * + */ +public class TableBlockIndexUniqueIdentifierWrapper implements Serializable { + + private static final long serialVersionUID = 1L; + + // holds the reference to tableBlockIndexUniqueIdentifier + private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; + + // holds the reference to CarbonTable + private CarbonTable carbonTable; + + public TableBlockIndexUniqueIdentifierWrapper( + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) { + this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier; + this.carbonTable = carbonTable; + } + + public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() { + return tableBlockIndexUniqueIdentifier; + } + + public CarbonTable getCarbonTable() { + return carbonTable; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 318fc6e..c434e2e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -44,16 +44,12 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.BlockletDataMapUtil; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -81,7 +77,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory // segmentId -> list of index file private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new ConcurrentHashMap<>(); - private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache; + private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache; public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) { super(carbonTable, dataMapSchema); @@ -104,11 +100,15 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); - List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = new ArrayList<>(identifiers.size()); - tableBlockIndexUniqueIdentifiers.addAll(identifiers); + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = - cache.getAll(tableBlockIndexUniqueIdentifiers); + cache.getAll(tableBlockIndexUniqueIdentifierWrappers); for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) { dataMaps.addAll(wrapper.getDataMaps()); } @@ -120,12 +120,6 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { - CarbonTable carbonTable = this.getCarbonTable(); - if (!carbonTable.getTableInfo().isTransactionalTable()) { - // For NonTransactional table, compare the schema of all index files with inferred schema. - // If there is a mismatch throw exception. As all files must be of same schema. - validateSchemaForNewTranscationalTableFiles(segment, carbonTable); - } tableBlockIndexUniqueIdentifiers = BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment); segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers); @@ -133,46 +127,6 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return tableBlockIndexUniqueIdentifiers; } - private void validateSchemaForNewTranscationalTableFiles(Segment segment, CarbonTable carbonTable) - throws IOException { - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - Map<String, String> indexFiles = segment.getCommittedIndexFile(); - for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) { - Path indexFile = new Path(indexFileEntry.getKey()); - org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( - indexFile.toString(), carbonTable.getTableName()); - TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( - tableInfo, identifier.getDatabaseName(), - identifier.getTableName(), - identifier.getTablePath()); - List<ColumnSchema> indexFileColumnList = - wrapperTableInfo.getFactTable().getListOfColumns(); - List<ColumnSchema> tableColumnList = - carbonTable.getTableInfo().getFactTable().getListOfColumns(); - if (!isSameColumnSchemaList(indexFileColumnList, tableColumnList)) { - LOG.error("Schema of " + indexFile.getName() - + " doesn't match with the table's schema"); - throw new IOException("All the files doesn't have same schema. " - + "Unsupported operation on nonTransactional table. Check logs."); - } - } - } - - private boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList, - List<ColumnSchema> tableColumnList) { - if (indexFileColumnList.size() != tableColumnList.size()) { - LOG.error("Index file's column size is " + indexFileColumnList.size() - + " but table's column size is " + tableColumnList.size()); - return false; - } - for (int i = 0; i < tableColumnList.size(); i++) { - if (!indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i))) { - return false; - } - } - return true; - } - /** * Get the blocklet detail information based on blockletid, blockid and segmentid. This method is * exclusively for BlockletDataMapFactory as detail information is only available in this @@ -191,9 +145,16 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory } Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); + Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = + new HashSet<>(identifiers.size()); + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } // Retrieve each blocklets detail information from blocklet datamap for (Blocklet blocklet : blocklets) { - detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet)); + detailedBlocklets.add(getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet)); } return detailedBlocklets; } @@ -204,14 +165,24 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory if (blocklet instanceof ExtendedBlocklet) { return (ExtendedBlocklet) blocklet; } - Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); - return getExtendedBlocklet(identifiers, blocklet); + Set<TableBlockIndexUniqueIdentifier> identifiers = + getTableBlockIndexUniqueIdentifiers(segment); + + Set<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = + new HashSet<>(identifiers.size()); + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + return getExtendedBlocklet(tableBlockIndexUniqueIdentifierWrappers, blocklet); } - private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers, - Blocklet blocklet) throws IOException { - for (TableBlockIndexUniqueIdentifier identifier : identifiers) { - BlockletDataMapIndexWrapper wrapper = cache.get(identifier); + private ExtendedBlocklet getExtendedBlocklet( + Set<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper, Blocklet blocklet) + throws IOException { + for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : identifiersWrapper) { + BlockletDataMapIndexWrapper wrapper = cache.get(identifierWrapper); List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); for (DataMap dataMap : dataMaps) { if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) { @@ -265,12 +236,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo()); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { - BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex); + TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper = + new TableBlockIndexUniqueIdentifierWrapper(blockIndex, this.getCarbonTable()); + BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndexWrapper); if (null != wrapper) { List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); for (DataMap dataMap : dataMaps) { if (dataMap != null) { - cache.invalidate(blockIndex); + cache.invalidate(blockIndexWrapper); dataMap.clear(); } } @@ -292,27 +265,28 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) throws IOException { BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; - List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>(); + List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>(); Path indexPath = new Path(mapDistributable.getFilePath()); String segmentNo = mapDistributable.getSegment().getSegmentNo(); if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { String parent = indexPath.getParent().toString(); - identifiers - .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo)); + identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( + new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo), + this.getCarbonTable())); } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString()); String parentPath = carbonFile.getParentFile().getAbsolutePath(); List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath()); for (String indexFile : indexFiles) { - identifiers.add( + identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(), - segmentNo)); + segmentNo), this.getCarbonTable())); } } List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); try { - List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers); + List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiersWrapper); for (BlockletDataMapIndexWrapper wrapper : wrappers) { dataMaps.addAll(wrapper.getDataMaps()); } @@ -356,9 +330,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return false; } - @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + @Override + public void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException { - cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper); + cache.put(tableBlockIndexUniqueIdentifierWrapper, blockletDataMapIndexWrapper); } @Override @@ -373,7 +348,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers, (BlockletDataMapDistributable) distributable); - if (null == cache.getIfPresent(validIdentifier)) { + if (null == cache.getIfPresent( + new TableBlockIndexUniqueIdentifierWrapper(validIdentifier, this.getCarbonTable()))) { ((BlockletDataMapDistributable) distributable) .setTableBlockIndexUniqueIdentifier(validIdentifier); distributablesToBeLoaded.add(distributable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index c2686d0..35e512d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -323,6 +323,21 @@ public class SegmentIndexFileStore { /** * List all the index files of the segment. * + * @param carbonFile directory + * @return + */ + public static CarbonFile[] getCarbonIndexFiles(CarbonFile carbonFile) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() + .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0); + } + }); + } + + /** + * List all the index files of the segment. + * * @param segmentPath * @return */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index ba051be..6949643 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -218,17 +218,9 @@ public class CarbonTable implements Serializable { } } - public static CarbonTable buildFromTablePath(String tableName, String tablePath, - boolean isTransactionalTable) throws IOException { - if (isTransactionalTable) { - return SchemaReader - .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, "default", tableName)); - } else { - // Infer the schema from the Carbondata file. - TableInfo tableInfoInfer = - SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "null", "null"), false); - return CarbonTable.buildFromTableInfo(tableInfoInfer); - } + public static CarbonTable buildDummyTable(String tablePath) throws IOException { + TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null"); + return CarbonTable.buildFromTableInfo(tableInfoInfer); } public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath) @@ -241,24 +233,7 @@ public class CarbonTable implements Serializable { */ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) { CarbonTable table = new CarbonTable(); - updateTableInfo(tableInfo); - table.tableInfo = tableInfo; - table.blockSize = tableInfo.getTableBlockSizeInMB(); - table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime(); - table.tableUniqueName = tableInfo.getTableUniqueName(); - table.setTransactionalTable(tableInfo.isTransactionalTable()); - table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable()); - table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName()); - if (tableInfo.getFactTable().getBucketingInfo() != null) { - table.tableBucketMap.put(tableInfo.getFactTable().getTableName(), - tableInfo.getFactTable().getBucketingInfo()); - } - if (tableInfo.getFactTable().getPartitionInfo() != null) { - table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(), - tableInfo.getFactTable().getPartitionInfo()); - } - table.hasDataMapSchema = - null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0; + updateTableByTableInfo(table, tableInfo); return table; } @@ -996,4 +971,31 @@ public class CarbonTable implements Serializable { } return indexColumn; } + + /** + * update the carbon table by using the passed tableInfo + * + * @param table + * @param tableInfo + */ + public static void updateTableByTableInfo(CarbonTable table, TableInfo tableInfo) { + updateTableInfo(tableInfo); + table.tableInfo = tableInfo; + table.blockSize = tableInfo.getTableBlockSizeInMB(); + table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime(); + table.tableUniqueName = tableInfo.getTableUniqueName(); + table.setTransactionalTable(tableInfo.isTransactionalTable()); + table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable()); + table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName()); + if (tableInfo.getFactTable().getBucketingInfo() != null) { + table.tableBucketMap.put(tableInfo.getFactTable().getTableName(), + tableInfo.getFactTable().getBucketingInfo()); + } + if (tableInfo.getFactTable().getPartitionInfo() != null) { + table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(), + tableInfo.getFactTable().getPartitionInfo()); + } + table.hasDataMapSchema = + null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java index 6a1234e..63cfa21 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java @@ -23,7 +23,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.mutate.UpdateVO; @@ -157,28 +156,20 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { @Override public void takeCarbonIndexFileSnapShot() throws IOException { // Read the current file Path get the list of indexes from the path. CarbonFile file = FileFactory.getCarbonFile(carbonFilePath); - CarbonFile[] files = file.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() - .endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName().endsWith("Fact"); - } - }); - if (files.length == 0) { - // For nonTransactional table, files can be removed at any point of time. - // So cannot assume files will be present - throw new IOException("No files are present in the table location :" + carbonFilePath); - } Map<String, List<String>> indexFileStore = new HashMap<>(); Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>(); CarbonFile[] carbonIndexFiles = null; if (file.isDirectory()) { if (segmentId == null) { - carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); + carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file); } else { String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId); carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); } + if (carbonIndexFiles.length == 0) { + throw new IOException( + "No Index files are present in the table location :" + carbonFilePath); + } for (int i = 0; i < carbonIndexFiles.length; i++) { // TODO. If Required to support merge index, then this code has to be modified. // TODO. Nested File Paths. http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 1c53fbb..c2faadc 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -86,10 +86,19 @@ public class SegmentUpdateStatusManager { this.identifier = table.getAbsoluteTableIdentifier(); // current it is used only for read function scenarios, as file update always requires to work // on latest file status. - segmentDetails = SegmentStatusManager.readLoadMetadata( - CarbonTablePath.getMetadataPath(identifier.getTablePath())); + if (!table.getTableInfo().isTransactionalTable()) { + // fileExist is costly operation, so check based on table Type + segmentDetails = new LoadMetadataDetails[0]; + } else { + segmentDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); + } isPartitionTable = table.isHivePartitionTable(); - updateDetails = readLoadMetadata(); + if (segmentDetails.length != 0) { + updateDetails = readLoadMetadata(); + } else { + updateDetails = new SegmentUpdateDetails[0]; + } populateMap(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index 0d28b9f..518cd03 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -33,20 +33,31 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; public class BlockletDataMapUtil { + private static final Log LOG = LogFactory.getLog(BlockletDataMapUtil.class); + public static Map<String, BlockMetaInfo> getBlockMetaInfoMap( - TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, - Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) - throws IOException { + TableBlockIndexUniqueIdentifierWrapper identifierWrapper, + SegmentIndexFileStore indexFileStore, Set<String> filesRead, + Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) throws IOException { + boolean isTransactionalTable = true; + TableBlockIndexUniqueIdentifier identifier = + identifierWrapper.getTableBlockIndexUniqueIdentifier(); + List<ColumnSchema> tableColumnList = null; if (identifier.getMergeIndexFileName() != null && indexFileStore.getFileData(identifier.getIndexFileName()) == null) { CarbonFile indexMergeFile = FileFactory.getCarbonFile( @@ -67,7 +78,25 @@ public class BlockletDataMapUtil { List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo( identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); + CarbonTable carbonTable = identifierWrapper.getCarbonTable(); + if (carbonTable != null) { + isTransactionalTable = carbonTable.getTableInfo().isTransactionalTable(); + tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + } for (DataFileFooter footer : indexInfo) { + if ((!isTransactionalTable) && (tableColumnList.size() != 0) && + !isSameColumnSchemaList(footer.getColumnInTable(), tableColumnList)) { + LOG.error("Schema of " + identifier.getIndexFileName() + + " doesn't match with the table's schema"); + throw new IOException("All the files doesn't have same schema. " + + "Unsupported operation on nonTransactional table. Check logs."); + } + if ((tableColumnList != null) && (tableColumnList.size() == 0)) { + // Carbon reader have used dummy columnSchema. Update it with inferred schema now + carbonTable.getTableInfo().getFactTable().setListOfColumns(footer.getColumnInTable()); + CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo()); + } String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); if (null == blockMetaInfoMap.get(blockPath)) { blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath)); @@ -156,6 +185,7 @@ public class BlockletDataMapUtil { * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file * * @param identifier + * @param segmentIndexFileStore * @return * @throws IOException */ @@ -177,4 +207,18 @@ public class BlockletDataMapUtil { return tableBlockIndexUniqueIdentifiers; } + private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList, + List<ColumnSchema> tableColumnList) { + if (indexFileColumnList.size() != tableColumnList.size()) { + LOG.error("Index file's column size is " + indexFileColumnList.size() + + " but table's column size is " + tableColumnList.size()); + return false; + } + for (int i = 0; i < tableColumnList.size(); i++) { + if (!indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i))) { + return false; + } + } + return true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 5a7bce3..e1e5e16 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -53,6 +53,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter; @@ -2371,6 +2372,35 @@ public final class CarbonUtil { } /** + * This method will prepare dummy tableInfo + * + * @param carbonDataFilePath + * @param tableName + * @return + */ + public static TableInfo buildDummyTableInfo(String carbonDataFilePath, + String tableName, String dbName) { + // During SDK carbon Reader, This method will be called. + // This API will avoid IO operation to get the columnSchema list. + // ColumnSchema list will be filled during blocklet loading (where actual IO happens) + List<ColumnSchema> columnSchemaList = new ArrayList<>(); + TableSchema tableSchema = getDummyTableSchema(tableName,columnSchemaList); + ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter = + new ThriftWrapperSchemaConverterImpl(); + org.apache.carbondata.format.TableSchema thriftFactTable = + thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema); + org.apache.carbondata.format.TableInfo tableInfo = + new org.apache.carbondata.format.TableInfo(thriftFactTable, + new ArrayList<org.apache.carbondata.format.TableSchema>()); + tableInfo.setDataMapSchemas(null); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, dbName, tableName, carbonDataFilePath); + wrapperTableInfo.setTransactionalTable(false); + return wrapperTableInfo; + } + + /** * This method will infer the schema file from a given index file path * @param indexFilePath * @param tableName http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java index dfbdd29..526f630 100644 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -57,7 +58,9 @@ public class TestBlockletDataMapFactory { private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; - private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache; + private TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper; + + private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache; @Before public void setUp() throws ClassNotFoundException, IllegalAccessException, InvocationTargetException, @@ -78,6 +81,8 @@ public class TestBlockletDataMapFactory { tableBlockIndexUniqueIdentifier = new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0", "0_batchno0-0-1521012756709.carbonindex", null, "0"); + tableBlockIndexUniqueIdentifierWrapper = + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, carbonTable); cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); } @@ -86,12 +91,12 @@ public class TestBlockletDataMapFactory { IllegalAccessException { List<BlockletDataMap> dataMaps = new ArrayList<>(); Method method = BlockletDataMapFactory.class - .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class, + .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifierWrapper.class, BlockletDataMapIndexWrapper.class); method.setAccessible(true); - method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier, + method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper, new BlockletDataMapIndexWrapper(dataMaps)); - BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier); + BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper); assert null != result; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index 5dbb5ac..0f20dc3 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -460,16 +460,6 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/ ``` /** - * Project all Columns for carbon reader - * - * @return CarbonReaderBuilder object - * @throws IOException - */ - public CarbonReaderBuilder projectAllColumns(); -``` - -``` - /** * Configure the transactional status of table * If set to false, then reads the carbondata and carbonindex files from a flat folder structure. * If set to true, then reads the carbondata and carbonindex files from segment folder structure. http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java index 8d3ff0d..ada1a8c 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java @@ -116,7 +116,6 @@ public class CarbonReaderExample { // Read data CarbonReader reader2 = CarbonReader .builder(path, "_temp") - .projectAllColumns() .build(); System.out.println("\nData:"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index da84c00..4911e41 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -123,7 +123,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { } } // Clear the datamap cache - DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear(); + DataMapStoreManager.getInstance() + .clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifier()); // close read support readSupport.close(); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 8ed89d5..8755176 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -23,26 +23,21 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; @@ -105,8 +100,10 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } @@ -115,6 +112,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se // get all valid segments and set them into the configuration // check for externalTable segment (Segment_null) // process and resolve the expression + ReadCommittedScope readCommittedScope = null; if (carbonTable.isTransactionalTable()) { readCommittedScope = new LatestFilesReadCommittedScope( @@ -129,44 +127,33 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter); - String segmentDir = null; + // if external table Segments are found, add it to the List + List<Segment> externalTableSegments = new ArrayList<Segment>(); + Segment seg; if (carbonTable.isTransactionalTable()) { - segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null"); + // SDK some cases write into the Segment Path instead of Table Path i.e. inside + // the "Fact/Part0/Segment_null". The segment in this case is named as "null". + // The table is denoted by default as a transactional table and goes through + // the path of CarbonFileInputFormat. The above scenario is handled in the below code. + seg = new Segment("null", null, readCommittedScope); + externalTableSegments.add(seg); } else { - segmentDir = identifier.getTablePath(); - } - FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); - if (FileFactory.isFileExist(segmentDir, fileType)) { - // if external table Segments are found, add it to the List - List<Segment> externalTableSegments = new ArrayList<Segment>(); - Segment seg; - if (carbonTable.isTransactionalTable()) { - // SDK some cases write into the Segment Path instead of Table Path i.e. inside - // the "Fact/Part0/Segment_null". The segment in this case is named as "null". - // The table is denoted by default as a transactional table and goes through - // the path of CarbonFileInputFormat. The above scenario is handled in the below code. - seg = new Segment("null", null, readCommittedScope); + LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); + for (LoadMetadataDetails load : loadMetadataDetails) { + seg = new Segment(load.getLoadName(), null, readCommittedScope); externalTableSegments.add(seg); - } else { - LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); - for (LoadMetadataDetails load : loadMetadataDetails) { - seg = new Segment(load.getLoadName(), null, readCommittedScope); - externalTableSegments.add(seg); - } } - - Map<String, String> indexFiles = - new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir); - - if (indexFiles.size() == 0) { - throw new RuntimeException("Index file not present to read the carbondata file"); - } - // do block filtering and get split - List<InputSplit> splits = - getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null); - - return splits; } + // do block filtering and get split + List<InputSplit> splits = + getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null); + if (getColumnProjection(job.getConfiguration()) == null) { + // If the user projection is empty, use default all columns as projections. + // All column name will be filled inside getSplits, so can update only here. + String[] projectionColumns = projectAllColumns(carbonTable); + setColumnProjection(job.getConfiguration(), projectionColumns); + } + return splits; } return null; } @@ -185,45 +172,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se numSegments = validSegments.size(); List<InputSplit> result = new LinkedList<InputSplit>(); - UpdateVO invalidBlockVOForSegmentId = null; - Boolean isIUDTable = false; - - SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable); - - isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); // for each segment fetch blocks matching filter in Driver BTree List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); numBlocks = dataBlocksOfSegment.size(); - for (CarbonInputSplit inputSplit : dataBlocksOfSegment) { - - // Get the UpdateVO for those tables on which IUD operations being performed. - if (isIUDTable) { - invalidBlockVOForSegmentId = - updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId()); - } - String[] deleteDeltaFilePath = null; - if (isIUDTable) { - // In case IUD is not performed in this table avoid searching for - // invalidated blocks. - if (CarbonUtil - .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(), - invalidBlockVOForSegmentId, updateStatusManager)) { - continue; - } - // When iud is done then only get delete delta files for a block - try { - deleteDeltaFilePath = updateStatusManager - .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId()); - } catch (Exception e) { - throw new IOException(e); - } - } - inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath); - result.add(inputSplit); - } + result.addAll(dataBlocksOfSegment); return result; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 05c70f8..485b087 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.profiler.ExplainCollector; import org.apache.carbondata.core.scan.expression.Expression; @@ -675,4 +676,27 @@ m filterExpression return false; } } + + /** + * Project all Columns for carbon reader + * + * @return String araay of columnNames + * @param carbonTable + */ + public String[] projectAllColumns(CarbonTable carbonTable) { + List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns(); + List<String> projectColumn = new ArrayList<>(); + for (ColumnSchema cols : colList) { + if (cols.getSchemaOrdinal() != -1) { + projectColumn.add(cols.getColumnUniqueId()); + } + } + String[] projectionColumns = new String[projectColumn.size()]; + int i = 0; + for (String columnName : projectColumn) { + projectionColumns[i] = columnName; + i++; + } + return projectionColumns; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala index e6d39d3..0e6f0c7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -184,7 +184,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be { sql("select * from sdkOutputTable").show(false) } - assert(exception.getMessage().contains("Index file not present to read the carbondata file")) + assert(exception.getMessage().contains("Error while taking index snapshot")) sql("DROP TABLE sdkOutputTable") // drop table should not delete the files http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala index 211bc8c..d7e500e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -346,7 +346,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd { sql("select * from sdkOutputTable").show(false) } - assert(exception.getMessage().contains("Index file not present to read the carbondata file")) + assert(exception.getMessage().contains("Error while taking index snapshot")) sql("DROP TABLE sdkOutputTable") // drop table should not delete the files http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 095d12d..14a63ca 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -993,7 +993,14 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("select * from sdkOutputTable").show(false) } assert(exception.getMessage() - .contains("All the files doesn't have same schema")) + .contains("Problem in loading segment blocks.")) + + val exception1 = + intercept[IOException] { + sql("select count(*) from sdkOutputTable").show(false) + } + assert(exception1.getMessage() + .contains("Problem in loading segment blocks.")) sql("DROP TABLE sdkOutputTable") // drop table should not delete the files @@ -1025,7 +1032,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("select * from sdkOutputTable").show(false) } assert(exception.getMessage() - .contains("All the files doesn't have same schema")) + .contains("Problem in loading segment blocks.")) sql("DROP TABLE sdkOutputTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 9d7470e..98aa6e0 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -26,7 +26,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; @@ -51,12 +50,6 @@ public class CarbonReaderBuilder { private boolean isTransactionalTable; /** - * It will be true if use the projectAllColumns methodï¼ - * it will be false if use the projection method - */ - private boolean isProjectAllColumns = true; - - /** * Construct a CarbonReaderBuilder with table path and table name * * @param tablePath table path @@ -76,7 +69,6 @@ public class CarbonReaderBuilder { public CarbonReaderBuilder projection(String[] projectionColumnNames) { Objects.requireNonNull(projectionColumnNames); this.projectionColumns = projectionColumnNames; - isProjectAllColumns = false; return this; } @@ -96,33 +88,6 @@ public class CarbonReaderBuilder { } /** - * Project all Columns for carbon reader - * - * @return CarbonReaderBuilder object - * @throws IOException - */ - public CarbonReaderBuilder projectAllColumns() throws IOException { - CarbonTable carbonTable = CarbonTable - .buildFromTablePath(tableName, tablePath, isTransactionalTable); - - List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns(); - List<String> projectColumn = new ArrayList<String>(); - for (ColumnSchema cols : colList) { - if (cols.getSchemaOrdinal() != -1) { - projectColumn.add(cols.getColumnUniqueId()); - } - } - projectionColumns = new String[projectColumn.size()]; - int i = 0; - for (String columnName : projectColumn) { - projectionColumns[i] = columnName; - i++; - } - isProjectAllColumns = true; - return this; - } - - /** * Configure the filter expression for carbon reader * * @param filterExpression filter expression @@ -209,8 +174,13 @@ public class CarbonReaderBuilder { * @throws InterruptedException */ public <T> CarbonReader<T> build() throws IOException, InterruptedException { - CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath, isTransactionalTable); - + // DB name is not applicable for SDK reader as, table will be never registered. + CarbonTable table; + if (isTransactionalTable) { + table = CarbonTable.buildFromTablePath(tableName, "default", tablePath); + } else { + table = CarbonTable.buildDummyTable(tablePath); + } final CarbonFileInputFormat format = new CarbonFileInputFormat(); final Job job = new Job(new Configuration()); format.setTableInfo(job.getConfiguration(), table.getTableInfo()); @@ -220,10 +190,11 @@ public class CarbonReaderBuilder { if (filterExpression != null) { format.setFilterPredicates(job.getConfiguration(), filterExpression); } - if (isProjectAllColumns) { - projectAllColumns(); + + if (projectionColumns != null) { + // set the user projection + format.setColumnProjection(job.getConfiguration(), projectionColumns); } - format.setColumnProjection(job.getConfiguration(), projectionColumns); final List<InputSplit> splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f68a792/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java index db118cd..a8aa795 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java @@ -385,9 +385,8 @@ public class CarbonReaderTest extends TestCase { // Write to a Non Transactional Table TestUtil.writeFilesAndVerify(new Schema(fields), path, true, false); - CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true) + CarbonReader reader = CarbonReader.builder(path, "_temp") .projection(new String[]{"name", "age"}) - .isTransactionalTable(false) .build(); // expected output after sorting @@ -892,7 +891,6 @@ public class CarbonReaderTest extends TestCase { CarbonReader reader = CarbonReader .builder(path, "_temp") .isTransactionalTable(true) - .projectAllColumns() .build(); // expected output after sorting