fix testcase clean
clean clean fix comment Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/7b8b1959 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/7b8b1959 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/7b8b1959 Branch: refs/heads/master Commit: 7b8b19598bf61001b2c4e07debbe26868f64d95c Parents: b966043 Author: jackylk <jacky.li...@huawei.com> Authored: Fri Dec 30 14:44:14 2016 +0800 Committer: chenliang613 <chenliang...@apache.org> Committed: Wed Jan 4 17:11:54 2017 +0800 ---------------------------------------------------------------------- .../scan/executor/impl/DetailQueryExecutor.java | 13 ++- .../impl/VectorDetailQueryExecutor.java | 11 +- .../scan/executor/util/QueryUtil.java | 4 +- .../carbondata/scan/filter/FilterUtil.java | 6 +- .../AbstractDetailQueryResultIterator.java | 2 +- .../iterator/DetailQueryResultIterator.java | 2 +- .../VectorDetailQueryResultIterator.java | 2 +- .../MalformedCarbonCommandException.java | 14 --- .../carbondata/spark/load/CarbonLoaderUtil.java | 18 ++- .../spark/load/DeleteLoadFolders.java | 54 +-------- .../spark/load/DeletedLoadMetadata.java | 53 --------- .../spark/merger/CarbonCompactionExecutor.java | 35 +----- .../spark/merger/CarbonDataMergerUtil.java | 48 +------- .../spark/merger/RowResultMerger.java | 8 +- .../spark/merger/TupleConversionAdapter.java | 6 +- .../spark/partition/api/DataPartitioner.java | 19 +-- .../spark/partition/api/Partition.java | 5 - .../api/impl/DataPartitionerProperties.java | 87 -------------- .../partition/api/impl/DefaultLoadBalancer.java | 4 - .../spark/partition/api/impl/PartitionImpl.java | 54 --------- .../api/impl/PartitionMultiFileImpl.java | 5 - .../api/impl/QueryPartitionHelper.java | 3 +- .../api/impl/SampleDataPartitionerImpl.java | 117 +------------------ .../carbondata/spark/util/CarbonQueryUtil.java | 14 +-- .../apache/carbondata/spark/CarbonFilters.scala | 104 +++++++++++------ .../spark/rdd/CarbonDataLoadRDD.scala | 10 +- .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 4 +- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 36 +++--- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +- .../spark/rdd/DataLoadPartitionCoalescer.scala | 8 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 59 +++++----- .../spark/util/GlobalDictionaryUtil.scala | 5 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 35 +++--- .../CarbonTableIdentifierImplicit.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 1 - .../spark/sql/CarbonDatasourceRelation.scala | 10 +- .../spark/sql/CarbonDictionaryDecoder.scala | 2 +- .../scala/org/apache/spark/sql/CarbonScan.scala | 3 +- .../spark/sql/SparkUnknownExpression.scala | 9 +- .../execution/command/carbonTableSchema.scala | 9 +- .../spark/sql/optimizer/CarbonOptimizer.scala | 15 ++- .../DataCompactionBlockletBoundryTest.scala | 3 +- .../DataCompactionBoundaryConditionsTest.scala | 1 + .../DataCompactionCardinalityBoundryTest.scala | 12 +- .../datacompaction/DataCompactionLockTest.scala | 3 +- .../DataCompactionNoDictionaryTest.scala | 5 +- .../datacompaction/DataCompactionTest.scala | 12 +- .../MajorCompactionIgnoreInMinorTest.scala | 1 + .../MajorCompactionStopsAfterCompaction.scala | 1 + .../spark/util/DictionaryTestCaseUtil.scala | 3 +- .../spark/rdd/CarbonDataRDDFactory.scala | 1 - 51 files changed, 248 insertions(+), 696 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java index f2f4b58..7e0f5a9 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java @@ -24,6 +24,7 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.scan.model.QueryModel; +import org.apache.carbondata.scan.result.BatchResult; import org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator; /** @@ -31,13 +32,17 @@ import org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator; * For executing the detail query it will pass all the block execution * info to detail query result iterator and iterator will be returned */ -public class DetailQueryExecutor extends AbstractQueryExecutor { +public class DetailQueryExecutor extends AbstractQueryExecutor<BatchResult> { - @Override public CarbonIterator<Object[]> execute(QueryModel queryModel) + @Override + public CarbonIterator<BatchResult> execute(QueryModel queryModel) throws QueryExecutionException { List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - return new DetailQueryResultIterator(blockExecutionInfoList, queryModel, - queryProperties.executorService); + return new DetailQueryResultIterator( + blockExecutionInfoList, + queryModel, + queryProperties.executorService + ); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java index 1cbf9c9..884a1eb 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java @@ -29,13 +29,16 @@ import org.apache.carbondata.scan.result.iterator.VectorDetailQueryResultIterato /** * Below class will be used to execute the detail query and returns columnar vectors. */ -public class VectorDetailQueryExecutor extends AbstractQueryExecutor { +public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { - @Override public CarbonIterator<Object[]> execute(QueryModel queryModel) + @Override public CarbonIterator<Object> execute(QueryModel queryModel) throws QueryExecutionException { List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - return new VectorDetailQueryResultIterator(blockExecutionInfoList, queryModel, - queryProperties.executorService); + return new VectorDetailQueryResultIterator( + blockExecutionInfoList, + queryModel, + queryProperties.executorService + ); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java index 3f9ebb5..9fdb9ae 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java @@ -336,9 +336,9 @@ public class QueryUtil { getDictionaryColumnUniqueIdentifierList(dictionaryColumnIdList, absoluteTableIdentifier.getCarbonTableIdentifier()); CacheProvider cacheProvider = CacheProvider.getInstance(); - Cache forwardDictionaryCache = cacheProvider + Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath()); - List<Dictionary> columnDictionaryList = null; + List<Dictionary> columnDictionaryList; try { columnDictionaryList = forwardDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers); } catch (CarbonUtilException e) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java index c1985bc..943ea1a 100644 --- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java @@ -951,12 +951,12 @@ public final class FilterUtil { new DictionaryColumnUniqueIdentifier(tableIdentifier.getCarbonTableIdentifier(), carbonDimension.getColumnIdentifier(), carbonDimension.getDataType()); CacheProvider cacheProvider = CacheProvider.getInstance(); - Cache forwardDictionaryCache = + Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, tableIdentifier.getStorePath()); // get the forward dictionary object - Dictionary forwardDictionary = null; + Dictionary forwardDictionary; try { - forwardDictionary = (Dictionary) forwardDictionaryCache.get(dictionaryColumnUniqueIdentifier); + forwardDictionary = forwardDictionaryCache.get(dictionaryColumnUniqueIdentifier); } catch (CarbonUtilException e) { throw new QueryExecutionException(e); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java index ff7a93c..f6b6ba1 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -46,7 +46,7 @@ import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch; * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { +public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterator<E> { /** * LOGGER. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java index a958195..7d944d6 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java @@ -33,7 +33,7 @@ import org.apache.carbondata.scan.result.BatchResult; * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator { +public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> { private final Object lock = new Object(); private Future<BatchResult> future; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java index 8d0e80c..417f597 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java @@ -28,7 +28,7 @@ import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch; /** * It reads the data vector batch format */ -public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIterator { +public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIterator<Object> { private final Object lock = new Object(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java index de7d4a2..e97d063 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java @@ -19,9 +19,6 @@ package org.apache.carbondata.spark.exception; - -import java.util.Locale; - public class MalformedCarbonCommandException extends Exception { @@ -56,17 +53,6 @@ public class MalformedCarbonCommandException extends Exception { } /** - * This method is used to get the localized message. - * - * @param locale - A Locale object represents a specific geographical, - * political, or cultural region. - * @return - Localized error message. - */ - public String getLocalizedMessage(Locale locale) { - return ""; - } - - /** * getLocalizedMessage */ @Override http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index eca34a6..419e833 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -241,7 +241,7 @@ public final class CarbonLoaderUtil { } } - public static void deleteStorePath(String path) { + private static void deleteStorePath(String path) { try { FileType fileType = FileFactory.getFileType(path); if (FileFactory.isFileExist(path, fileType)) { @@ -311,7 +311,7 @@ public final class CarbonLoaderUtil { String localStoreLocation = CarbonProperties.getInstance() .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); try { - CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile() }); + CarbonUtil.deleteFoldersAndFiles(new File(localStoreLocation).getParentFile()); LOGGER.info("Deleted the local store location" + localStoreLocation); } catch (CarbonUtilException e) { LOGGER.error(e, "Failed to delete local data load folder location"); @@ -325,15 +325,13 @@ public final class CarbonLoaderUtil { * @param storePath * @param carbonTableIdentifier * @param segmentId - * @param partitionId * @return */ public static String getStoreLocation(String storePath, - CarbonTableIdentifier carbonTableIdentifier, String segmentId, String partitionId) { + CarbonTableIdentifier carbonTableIdentifier, String segmentId) { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier); - String carbonDataFilePath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId); - return carbonDataFilePath; + return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); } /** @@ -385,9 +383,7 @@ public final class CarbonLoaderUtil { new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); if (null != listOfLoadFolderDetailsArray) { - for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { - listOfLoadFolderDetails.add(loadMetadata); - } + Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray); } listOfLoadFolderDetails.add(loadMetadataDetails); @@ -463,9 +459,9 @@ public final class CarbonLoaderUtil { public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier, String carbonStorePath) throws CarbonUtilException { - Cache dictCache = + Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath); - return (Dictionary) dictCache.get(columnIdentifier); + return dictCache.get(columnIdentifier); } public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java index 0663abc..9f092e7 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java @@ -26,6 +26,7 @@ * Description : for physical deletion of load folders. * Class Version : 1.0 */ + package org.apache.carbondata.spark.load; import java.io.IOException; @@ -158,59 +159,6 @@ public final class DeleteLoadFolders { return false; } - private static void factFileRenaming(String loadFolderPath) { - - FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath); - try { - if (FileFactory.isFileExist(loadFolderPath, fileType)) { - CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType); - - CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return (file.getName().endsWith('_' + CarbonCommonConstants.FACT_FILE_UPDATED)); - } - }); - - for (CarbonFile file : listFiles) { - if (!file.renameTo(file.getName().substring(0, - file.getName().length() - CarbonCommonConstants.FACT_FILE_UPDATED.length()))) { - LOGGER.warn("could not rename the updated fact file."); - } - } - - } - } catch (IOException e) { - LOGGER.error("exception" + e.getMessage()); - } - - } - - private static void cleanDeletedFactFile(String loadFolderPath) { - FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath); - try { - if (FileFactory.isFileExist(loadFolderPath, fileType)) { - CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType); - - CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return (file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION)); - } - }); - - for (CarbonFile file : listFiles) { - if (!file.delete()) { - LOGGER.warn("could not delete the marked fact file."); - } - } - - } - } catch (IOException e) { - LOGGER.error("exception" + e.getMessage()); - } - } - public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName, String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) { List<LoadMetadataDetails> deletedLoads = http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java deleted file mode 100644 index 661e17c..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.load; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; - -public class DeletedLoadMetadata implements Serializable { - - private static final long serialVersionUID = 7083059404172117208L; - private Map<String, String> deletedLoadMetadataMap = - new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - public void addDeletedLoadMetadata(String loadId, String status) { - deletedLoadMetadataMap.put(loadId, status); - } - - public List<String> getDeletedLoadMetadataIds() { - return new ArrayList<String>(deletedLoadMetadataMap.keySet()); - } - - public String getDeletedLoadMetadataStatus(String loadId) { - if (deletedLoadMetadataMap.containsKey(loadId)) { - return deletedLoadMetadataMap.get(loadId); - } else { - return null; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java index dc5fb17..ef3d2e4 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java @@ -56,41 +56,23 @@ public class CarbonCompactionExecutor { LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName()); private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping; private final SegmentProperties destinationSegProperties; - private final String databaseName; - private final String factTableName; private final Map<String, TaskBlockInfo> segmentMapping; - private final String storePath; private QueryExecutor queryExecutor; private CarbonTable carbonTable; private QueryModel queryModel; /** * Constructor - * - * @param segmentMapping + * @param segmentMapping * @param segmentProperties - * @param databaseName - * @param factTableName - * @param storePath * @param carbonTable */ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, - SegmentProperties segmentProperties, String databaseName, String factTableName, - String storePath, CarbonTable carbonTable, + SegmentProperties segmentProperties, CarbonTable carbonTable, Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) { - this.segmentMapping = segmentMapping; - this.destinationSegProperties = segmentProperties; - - this.databaseName = databaseName; - - this.factTableName = factTableName; - - this.storePath = storePath; - this.carbonTable = carbonTable; - this.dataFileMetadataSegMapping = dataFileMetadataSegMapping; } @@ -143,18 +125,9 @@ public class CarbonCompactionExecutor { */ private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException { - queryModel.setTableBlockInfos(blockList); this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); - CarbonIterator<BatchResult> iter = null; - try { - iter = queryExecutor.execute(queryModel); - } catch (QueryExecutionException e) { - LOGGER.error(e.getMessage()); - throw e; - } - - return iter; + return queryExecutor.execute(queryModel); } /** @@ -191,7 +164,7 @@ public class CarbonCompactionExecutor { * @param blockList * @return */ - public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { + private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { QueryModel model = new QueryModel(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java index 836a757..41709f7 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java @@ -28,15 +28,12 @@ import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -161,7 +158,7 @@ public final class CarbonDataMergerUtil { .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) { LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName() + " is deleted after the compaction is started."); - return tableStatusUpdationStatus; + return false; } loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED); loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp); @@ -225,7 +222,7 @@ public final class CarbonDataMergerUtil { CarbonLoadModel carbonLoadModel, long compactionSize, List<LoadMetadataDetails> segments, CompactionType compactionType) { - List sortedSegments = new ArrayList(segments); + List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments); sortSegments(sortedSegments); @@ -466,7 +463,7 @@ public final class CarbonDataMergerUtil { private static long getSizeOfSegment(String storeLocation, CarbonTableIdentifier tableIdentifier, String segId) { String loadPath = CarbonLoaderUtil - .getStoreLocation(storeLocation, tableIdentifier, segId, "0"); + .getStoreLocation(storeLocation, tableIdentifier, segId); CarbonFile segmentFolder = FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); return getSizeOfFactFileInLoad(segmentFolder); @@ -557,11 +554,9 @@ public final class CarbonDataMergerUtil { */ private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining( List<LoadMetadataDetails> segments) { - - int numberOfSegmentsToBePreserved = 0; // check whether the preserving of the segments from merging is enabled or not. // get the number of loads to be preserved. - numberOfSegmentsToBePreserved = + int numberOfSegmentsToBePreserved = CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved(); // get the number of valid segments and retain the latest loads from merging. return CarbonDataMergerUtil @@ -635,9 +630,9 @@ public final class CarbonDataMergerUtil { for (LoadMetadataDetails segment : loadMetadataDetails) { //check if this load is an already merged load. if (null != segment.getMergedLoadName()) { - builder.append(segment.getMergedLoadName() + ","); + builder.append(segment.getMergedLoadName()).append(","); } else { - builder.append(segment.getLoadName() + ","); + builder.append(segment.getLoadName()).append(","); } } builder.deleteCharAt(builder.length() - 1); @@ -645,37 +640,6 @@ public final class CarbonDataMergerUtil { } /** - * Combining the list of maps to one map. - * - * @param mapsOfNodeBlockMapping - * @return - */ - public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps( - List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) { - - Map<String, List<TableBlockInfo>> combinedMap = - new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - // traverse list of maps. - for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) { - // traverse inside each map. - for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) { - - String node = eachEntry.getKey(); - List<TableBlockInfo> blocks = eachEntry.getValue(); - - // if already that node detail exist in the combined map. - if (null != combinedMap.get(node)) { - List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node); - blocksAlreadyPresent.addAll(blocks); - } else { // if its not present in map then put to map. - combinedMap.put(node, blocks); - } - } - } - return combinedMap; - } - - /** * Removing the already merged segments from list. */ public static List<LoadMetadataDetails> filterOutNewlyAddedSegments( http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java index 44c05a1..5c534fa 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java @@ -63,7 +63,6 @@ public class RowResultMerger { private final String databaseName; private final String tableName; private final String tempStoreLocation; - private final int measureCount; private final String factStoreLocation; private CarbonFactHandler dataHandler; private List<RawResultIterator> rawResultIteratorList = @@ -101,7 +100,7 @@ public class RowResultMerger { this.databaseName = databaseName; this.tableName = tableName; - this.measureCount = segprop.getMeasures().size(); + int measureCount = segprop.getMeasures().size(); CarbonTable carbonTable = CarbonMetadata.getInstance() .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); CarbonFactDataHandlerModel carbonFactDataHandlerModel = @@ -195,7 +194,7 @@ public class RowResultMerger { * * @throws SliceMergerException */ - protected void addRow(Object[] carbonTuple) throws SliceMergerException { + private void addRow(Object[] carbonTuple) throws SliceMergerException { Object[] rowInWritableFormat; rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple); @@ -270,12 +269,11 @@ public class RowResultMerger { */ private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName, String tableName, String partitionId, String segmentId) { - String carbonStorePath = factStoreLocation; CarbonTable carbonTable = CarbonMetadata.getInstance() .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier); + CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java index 94ebf40..35b8164 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java @@ -22,19 +22,16 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.processing.util.RemoveDictionaryUtil; import org.apache.carbondata.scan.wrappers.ByteArrayWrapper; /** * This class will be used to convert the Result into the format used in data writer. */ -public class TupleConversionAdapter { +class TupleConversionAdapter { private final SegmentProperties segmentproperties; - private final List<CarbonMeasure> measureList; - private int noDictionaryPresentIndex; private int measureCount; @@ -48,7 +45,6 @@ public class TupleConversionAdapter { noDictionaryPresentIndex++; } this.segmentproperties = segmentProperties; - measureList = segmentProperties.getMeasures(); } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java index 58f3a2d..f097b54 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java @@ -21,15 +21,7 @@ package org.apache.carbondata.spark.partition.api; import java.util.List; -import org.apache.carbondata.scan.model.CarbonQueryPlan; - -import org.apache.spark.sql.execution.command.Partitioner; - public interface DataPartitioner { - /** - * Initialise the partitioner based on the given columns - */ - void initialize(String basePath, String[] columns, Partitioner partitioner); /** * All the partitions built by the Partitioner @@ -37,18 +29,9 @@ public interface DataPartitioner { List<Partition> getAllPartitions(); /** - * Partition where the tuple should be present. (API used for data loading purpose) - */ - Partition getPartionForTuple(Object[] tuple, long rowCounter); - - /** * Identifies the partitions applicable for the given filter (API used for For query) */ - List<Partition> getPartitions(CarbonQueryPlan queryPlan); - - String[] getPartitionedColumns(); - - Partitioner getPartitioner(); + List<Partition> getPartitions(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java index 61639d3..ca6e7bd 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java @@ -29,11 +29,6 @@ public interface Partition extends Serializable { String getUniqueID(); /** - * File path for the raw data represented by this partition - */ - String getFilePath(); - - /** * result * * @return http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java deleted file mode 100644 index bc6e54f..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Properties; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - -public final class DataPartitionerProperties { - private static final LogService LOGGER = - LogServiceFactory.getLogService(DataPartitionerProperties.class.getName()); - - private static DataPartitionerProperties instance; - - private Properties properties; - - private DataPartitionerProperties() { - properties = loadProperties(); - } - - public static DataPartitionerProperties getInstance() { - if (instance == null) { - instance = new DataPartitionerProperties(); - } - return instance; - } - - public String getValue(String key, String defaultVal) { - return properties.getProperty(key, defaultVal); - } - - public String getValue(String key) { - return properties.getProperty(key); - } - - /** - * Read the properties from CSVFilePartitioner.properties - */ - private Properties loadProperties() { - Properties props = new Properties(); - - File file = new File("DataPartitioner.properties"); - FileInputStream fis = null; - try { - if (file.exists()) { - fis = new FileInputStream(file); - - props.load(fis); - } - } catch (Exception e) { - LOGGER - .error(e, e.getMessage()); - } finally { - if (null != fis) { - try { - fis.close(); - } catch (IOException e) { - LOGGER.error(e, - e.getMessage()); - } - } - } - - return props; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java index 9bee8a2..6594c79 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java @@ -59,10 +59,6 @@ public class DefaultLoadBalancer { } } - public List<Partition> getPartitionsForNode(String node) { - return nodeToPartitonMap.get(node); - } - public String getNodeForPartitions(Partition partition) { return partitonToNodeMap.get(partition); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java deleted file mode 100644 index bd7cc42..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.List; - -import org.apache.carbondata.spark.partition.api.Partition; - -public class PartitionImpl implements Partition { - private static final long serialVersionUID = 3020172346383028547L; - private String uniqueID; - private String folderPath; - - - public PartitionImpl(String uniqueID, String folderPath) { - this.uniqueID = uniqueID; - this.folderPath = folderPath; - } - - @Override public String getUniqueID() { - return uniqueID; - } - - @Override public String getFilePath() { - return folderPath; - } - - @Override public String toString() { - return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}'; - } - - @Override public List<String> getFilesPath() { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java index de32b5c..f34e9c9 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java @@ -38,11 +38,6 @@ public class PartitionMultiFileImpl implements Partition { return uniqueID; } - @Override public String getFilePath() { - // TODO Auto-generated method stub - return null; - } - @Override public List<String> getFilesPath() { // TODO Auto-generated method stub return folderPath; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java index e05be7d..a864ebf 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java @@ -52,8 +52,7 @@ public final class QueryPartitionHelper { DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); - List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan); - return queryPartitions; + return dataPartitioner.getPartitions(); } public List<Partition> getAllPartitions(String databaseName, String tableName) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java index c9b434a..21132ab 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java @@ -19,133 +19,24 @@ package org.apache.carbondata.spark.partition.api.impl; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.scan.model.CarbonQueryPlan; import org.apache.carbondata.spark.partition.api.DataPartitioner; import org.apache.carbondata.spark.partition.api.Partition; -import org.apache.spark.sql.execution.command.Partitioner; - /** * Sample partition. */ public class SampleDataPartitionerImpl implements DataPartitioner { - private static final LogService LOGGER = - LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName()); - private int numberOfPartitions = 1; - - private int partionColumnIndex = -1; - - private String partitionColumn; - - private Partitioner partitioner; - private List<Partition> allPartitions; - private String baseLocation; - - public SampleDataPartitionerImpl() { - } - - public void initialize(String basePath, String[] columns, Partitioner partitioner) { - this.partitioner = partitioner; - numberOfPartitions = partitioner.partitionCount(); - - partitionColumn = partitioner.partitionColumn()[0]; - LOGGER.info("SampleDataPartitionerImpl initializing with following properties."); - LOGGER.info("partitionCount: " + numberOfPartitions); - LOGGER.info("partitionColumn: " + partitionColumn); - LOGGER.info("basePath: " + basePath); - LOGGER.info("columns: " + Arrays.toString(columns)); - - this.baseLocation = basePath; - allPartitions = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - for (int i = 0; i < columns.length; i++) { - if (columns[i].equalsIgnoreCase(partitionColumn)) { - partionColumnIndex = i; - break; - } - } - - for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) { - PartitionImpl partitionImpl = - new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter); - - List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - includedHashes.add(partionCounter); - - allPartitions.add(partitionImpl); - } - } - - @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) { - int hashCode; - if (partionColumnIndex == -1) { - hashCode = hashCode(rowCounter); - } else { - try { - hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode()); - } catch (NumberFormatException e) { - hashCode = hashCode(0); - } - } - return allPartitions.get(hashCode); - } - /** - * - */ + @Override public List<Partition> getAllPartitions() { - return allPartitions; + return null; } - /** - * @see DataPartitioner#getPartitions(CarbonQueryPlan) - */ - public List<Partition> getPartitions(CarbonQueryPlan queryPlan) { - // TODO: this has to be redone during partitioning implmentatation - return allPartitions; - } - - /** - * Identify the partitions applicable for the given filter - */ + @Override public List<Partition> getPartitions() { - return allPartitions; - - // TODO: this has to be redone during partitioning implementation - // for (Partition aPartition : allPartitions) { - // CarbonDimensionLevelFilter partitionFilterDetails = - // aPartition.getPartitionDetails().get(partitionColumn); - // - // //Check if the partition is serving any of the - // //hash code generated for include filter of query - // for (Object includeFilter : msisdnFilter.getIncludeFilter()) { - // int hashCode = hashCode(((String) includeFilter).hashCode()); - // if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) { - // allowedPartitions.add(aPartition); - // break; - // } - // } - // } - - } - - private int hashCode(long key) { - return (int) (Math.abs(key) % numberOfPartitions); - } - - @Override public String[] getPartitionedColumns() { - return new String[] { partitionColumn }; - } - - @Override public Partitioner getPartitioner() { - return partitioner; + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java index 9d1a281..94e475e 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java @@ -19,8 +19,8 @@ package org.apache.carbondata.spark.util; -import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,7 +49,7 @@ public class CarbonQueryUtil { * It creates the one split for each region server. */ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan) throws IOException { + CarbonQueryPlan queryPlan) { //Just create splits depends on locations of region servers List<Partition> allPartitions = null; @@ -78,7 +78,7 @@ public class CarbonQueryUtil { /** * It creates the one split for each region server. */ - public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception { + public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) { //Just create splits depends on locations of region servers DefaultLoadBalancer loadBalancer = null; @@ -104,13 +104,11 @@ public class CarbonQueryUtil { String separator) { if (StringUtils.isNotEmpty(sourcePath)) { String[] files = sourcePath.split(separator); - for (String file : files) { - partitionsFiles.add(file); - } + Collections.addAll(partitionsFiles, files); } } - private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception { + private static List<Partition> getAllFilesForDataLoad(String sourcePath) { List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN); splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA); List<Partition> partitionList = @@ -121,7 +119,7 @@ public class CarbonQueryUtil { partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0))); for (int i = 0; i < files.size(); i++) { - partitionFiles.get(i % 1).add(files.get(i)); + partitionFiles.get(0).add(files.get(i)); } return partitionList; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala index 5e58235..6115519 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala @@ -151,31 +151,30 @@ object CarbonFilters { Some(sources.EqualTo(a.name, v)) case EqualTo(Literal(v, t), Cast(a: Attribute, _)) => Some(sources.EqualTo(a.name, v)) - - case Not(EqualTo(a: Attribute, Literal(v, t))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Literal(v, t), a: Attribute)) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name)) - case IsNull(a: Attribute) => Some(sources.IsNull(a.name)) + case Not(EqualTo(a: Attribute, Literal(v, t))) => + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Literal(v, t), a: Attribute)) => + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => + Some(sources.Not(sources.EqualTo(a.name, v))) + case IsNotNull(a: Attribute) => + Some(sources.IsNotNull(a.name)) + case IsNull(a: Attribute) => + Some(sources.IsNull(a.name)) case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) Some(sources.Not(sources.In(a.name, hSet.toArray))) case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) Some(sources.In(a.name, hSet.toArray)) - case Not(In(Cast(a: Attribute, _), list)) - if !list.exists(!_.isInstanceOf[Literal]) => + case Not(In(Cast(a: Attribute, _), list)) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) Some(sources.Not(sources.In(a.name, hSet.toArray))) case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) Some(sources.In(a.name, hSet.toArray)) - case GreaterThan(a: Attribute, Literal(v, t)) => Some(sources.GreaterThan(a.name, v)) case GreaterThan(Literal(v, t), a: Attribute) => @@ -184,7 +183,6 @@ object CarbonFilters { Some(sources.GreaterThan(a.name, v)) case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) => Some(sources.LessThan(a.name, v)) - case LessThan(a: Attribute, Literal(v, t)) => Some(sources.LessThan(a.name, v)) case LessThan(Literal(v, t), a: Attribute) => @@ -193,7 +191,6 @@ object CarbonFilters { Some(sources.LessThan(a.name, v)) case LessThan(Literal(v, t), Cast(a: Attribute, _)) => Some(sources.GreaterThan(a.name, v)) - case GreaterThanOrEqual(a: Attribute, Literal(v, t)) => Some(sources.GreaterThanOrEqual(a.name, v)) case GreaterThanOrEqual(Literal(v, t), a: Attribute) => @@ -202,7 +199,6 @@ object CarbonFilters { Some(sources.GreaterThanOrEqual(a.name, v)) case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => Some(sources.LessThanOrEqual(a.name, v)) - case LessThanOrEqual(a: Attribute, Literal(v, t)) => Some(sources.LessThanOrEqual(a.name, v)) case LessThanOrEqual(Literal(v, t), a: Attribute) => @@ -248,23 +244,63 @@ object CarbonFilters { (transformExpression(left) ++ transformExpression(right)).reduceOption(new AndExpression(_, _)) - case EqualTo(a: Attribute, l@Literal(v, t)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(l@Literal(v, t), a: Attribute) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - - case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) + case EqualTo(a: Attribute, l@Literal(v, t)) => + Some( + new EqualToExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case EqualTo(l@Literal(v, t), a: Attribute) => + Some( + new EqualToExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => + Some( + new EqualToExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => + Some( + new EqualToExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + + case Not(EqualTo(a: Attribute, l@Literal(v, t))) => + Some( + new NotEqualsExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case Not(EqualTo(l@Literal(v, t), a: Attribute)) => + Some( + new NotEqualsExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => + Some( + new NotEqualsExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) + case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => + Some( + new NotEqualsExpression( + transformExpression(a).get, + transformExpression(l).get + ) + ) case IsNotNull(child: Attribute) => Some(new NotEqualsExpression(transformExpression(child).get, transformExpression(Literal(null)).get, true)) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala index 5d6a663..57087f3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -231,8 +231,8 @@ class DataFileLoaderRDD[K, V]( var partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() var model: CarbonLoadModel = _ - var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + - theSplit.index + val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + + theSplit.index try { loadMetadataDetails.setPartitionCount(partitionID) carbonLoadModel.setSegmentId(String.valueOf(loadCount)) @@ -500,10 +500,10 @@ class DataFrameLoaderRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val resultIter = new Iterator[(K, V)] { - var partitionID = "0" + val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() - var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + - theSplit.index + val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + + theSplit.index try { loadMetadataDetails.setPartitionCount(partitionID) carbonLoadModel.setPartitionId(partitionID) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala index 0534def..33f453b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.LoadMetadataDetails import org.apache.carbondata.spark.DeletedLoadResult -import org.apache.carbondata.spark.load.DeletedLoadMetadata import org.apache.carbondata.spark.util.CarbonQueryUtil class CarbonDeleteLoadByDateRDD[K, V]( @@ -53,7 +52,6 @@ class CarbonDeleteLoadByDateRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { new Iterator[(K, V)] { - val deletedMetaData = new DeletedLoadMetadata() val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) @@ -69,7 +67,7 @@ class CarbonDeleteLoadByDateRDD[K, V]( case e: Exception => logInfo("Unable to parse with default time format " + dateValue) } // TODO: Implement it - var finished = false + val finished = false override def hasNext: Boolean = { finished http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 3c15818..9d9fa99 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import java.util.regex.Pattern import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.ArrayBuffer import scala.util.control.Breaks.{break, breakable} import au.com.bytecode.opencsv.CSVReader @@ -34,17 +34,19 @@ import org.apache.spark.sql.Row import org.apache.carbondata.common.factory.CarbonCommonFactory import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.Dictionary import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.carbon.path.CarbonTablePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.service.PathService import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} -import org.apache.carbondata.spark.util.CarbonScalaUtil -import org.apache.carbondata.spark.util.GlobalDictionaryUtil +import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil} /** * A partitioner partition by column. @@ -70,9 +72,9 @@ case class DictionaryStats(distinctValues: java.util.List[String], dictWriteTime: Long, sortIndexWriteTime: Long) case class PrimitiveParser(dimension: CarbonDimension, - setOpt: Option[HashSet[String]]) extends GenericParser { - val (hasDictEncoding, set: HashSet[String]) = setOpt match { - case None => (false, new HashSet[String]) + setOpt: Option[mutable.HashSet[String]]) extends GenericParser { + val (hasDictEncoding, set: mutable.HashSet[String]) = setOpt match { + case None => (false, new mutable.HashSet[String]) case Some(x) => (true, x) } @@ -183,7 +185,7 @@ class CarbonAllDictionaryCombineRDD( ): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])] + val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])] /* * for all dictionary, all columns need to encoding and checking * isHighCardinalityColumn, so no need to calculate rowcount @@ -194,7 +196,7 @@ class CarbonAllDictionaryCombineRDD( GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList) val dimNum = model.dimensions.length // Map[dimColName -> dimColNameIndex] - val columnIndexMap = new HashMap[String, Int]() + val columnIndexMap = new mutable.HashMap[String, Int]() for (j <- 0 until dimNum) { columnIndexMap.put(model.dimensions(j).getColName, j) } @@ -245,7 +247,7 @@ class CarbonBlockDistinctValuesCombineRDD( context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() - val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])] + val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])] var rowCount = 0L try { val dimensionParsers = @@ -297,15 +299,15 @@ class CarbonGlobalDictionaryGenerateRDD( override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS var isHighCardinalityColumn = false val iter = new Iterator[(Int, String, Boolean)] { - var dictionaryForDistinctValueLookUp: - org.apache.carbondata.core.cache.dictionary.Dictionary = _ - var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _ + var dictionaryForDistinctValueLookUp: Dictionary = _ + var dictionaryForSortIndexWriting: Dictionary = _ var dictionaryForDistinctValueLookUpCleared: Boolean = false - val pathService = CarbonCommonFactory.getPathService - val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table) + val pathService: PathService = CarbonCommonFactory.getPathService + val carbonTablePath: CarbonTablePath = + pathService.getCarbonTablePath(model.hdfsLocation, model.table) if (StringUtils.isNotBlank(model.hdfsTempLocation )) { CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, model.hdfsTempLocation) @@ -538,8 +540,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, } } } - val mapIdWithSet = new HashMap[String, HashSet[String]] - val columnValues = new HashSet[String] + val mapIdWithSet = new mutable.HashMap[String, mutable.HashSet[String]] + val columnValues = new mutable.HashSet[String] val distinctValues = (theSplit.index, columnValues) mapIdWithSet.put(primDimension.getColumnId, columnValues) // use parser to generate new dict value http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index e740caa..94fa601 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -123,10 +123,8 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.setStorePath(storePath) - exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName, - factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, - dataFileMetadataSegMapping - ) + exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, dataFileMetadataSegMapping) // fire a query and get the results. var result2: util.List[RawResultIterator] = null http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala index 77402b4..02718e0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala @@ -71,7 +71,7 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val prevPartitions = prev.partitions - var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length)) + val numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length)) // host => partition id list val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]] // partition id => host list @@ -268,7 +268,7 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) { prevPartIndexs(i % numOfParts) += prevPartitions(i).index } prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x => - new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2)) + CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2)) } } @@ -312,8 +312,8 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) { } else { Some(emptyHosts(index - localityResult.length)) } - LOGGER.info(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ") - new CoalescedRDDPartition(index, prev, ids, loc) + LOGGER.info(s"CoalescedRDDPartition $index, ${ids.length}, $loc ") + CoalescedRDDPartition(index, prev, ids, loc) }.filter(_.parentsIndices.nonEmpty).toArray } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 64b8b61..22d8406 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -144,7 +144,7 @@ class NewCarbonDataLoadRDD[K, V]( var partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() var model: CarbonLoadModel = _ - var uniqueLoadStatusId = + val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { loadMetadataDetails.setPartitionCount(partitionID) @@ -279,38 +279,35 @@ class NewCarbonDataLoadRDD[K, V]( } override def getPreferredLocations(split: Partition): Seq[String] = { - isTableSplitPartition match { - case true => - // for table split partition - val theSplit = split.asInstanceOf[CarbonTableSplitPartition] - val location = theSplit.serializableHadoopSplit.value.getLocations.asScala - location - case false => - // for node partition - val theSplit = split.asInstanceOf[CarbonNodePartition] - val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit) - logInfo("Preferred Location for split : " + firstOptionLocation.head) - val blockMap = new util.LinkedHashMap[String, Integer]() - val tableBlocks = theSplit.blocksDetails - tableBlocks.foreach { tableBlock => - tableBlock.getLocations.foreach { location => - if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) { - val currentCount = blockMap.get(location) - if (currentCount == null) { - blockMap.put(location, 1) - } else { - blockMap.put(location, currentCount + 1) - } + if (isTableSplitPartition) { + val theSplit = split.asInstanceOf[CarbonTableSplitPartition] + val location = theSplit.serializableHadoopSplit.value.getLocations.asScala + location + } else { + val theSplit = split.asInstanceOf[CarbonNodePartition] + val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit) + logInfo("Preferred Location for split : " + firstOptionLocation.head) + val blockMap = new util.LinkedHashMap[String, Integer]() + val tableBlocks = theSplit.blocksDetails + tableBlocks.foreach { tableBlock => + tableBlock.getLocations.foreach { location => + if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) { + val currentCount = blockMap.get(location) + if (currentCount == null) { + blockMap.put(location, 1) + } else { + blockMap.put(location, currentCount + 1) } } } + } - val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) => - nodeCount1.getValue > nodeCount2.getValue - } + val sortedList = blockMap.entrySet().asScala.toSeq.sortWith { (nodeCount1, nodeCount2) => + nodeCount1.getValue > nodeCount2.getValue + } - val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2) - firstOptionLocation ++ sortedNodesList + val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2) + firstOptionLocation ++ sortedNodesList } } } @@ -333,10 +330,10 @@ class NewDataFrameLoaderRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - var partitionID = "0" + val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() - var model: CarbonLoadModel = carbonLoadModel - var uniqueLoadStatusId = + val model: CarbonLoadModel = carbonLoadModel + val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 5d6badd..84c71ed 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -328,7 +328,7 @@ object GlobalDictionaryUtil { if (null == carbonLoadModel.getLoadMetadataDetails) { CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation) } - new DictionaryLoadModel(table, + DictionaryLoadModel(table, dimensions, hdfsLocation, dictfolderPath, @@ -345,7 +345,8 @@ object GlobalDictionaryUtil { hdfsTempLocation, lockType, zookeeperUrl, - serializationNullFormat) + serializationNullFormat + ) } /**