[CARBONDATA-1739] Clean up store path interface This closes #1509
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fc7f06f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fc7f06f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fc7f06f Branch: refs/heads/master Commit: 5fc7f06f23e944719b2735b97176d68fe209ad75 Parents: b6777fc Author: Jacky Li <jacky.li...@qq.com> Authored: Thu Nov 16 19:41:19 2017 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Fri Nov 17 14:46:19 2017 +0800 ---------------------------------------------------------------------- .../dictionary/ManageDictionaryAndBTree.java | 2 +- .../core/metadata/CarbonMetadata.java | 2 +- .../core/metadata/schema/table/CarbonTable.java | 4 +- .../core/mutate/CarbonUpdateUtil.java | 8 +- .../carbondata/core/scan/model/QueryModel.java | 4 +- .../carbondata/core/util/CarbonProperties.java | 7 + .../core/metadata/CarbonMetadataTest.java | 9 +- .../metadata/schema/table/CarbonTableTest.java | 3 +- .../table/CarbonTableWithComplexTypesTest.java | 2 +- .../carbondata/examples/StreamExample.scala | 4 +- .../carbondata/hadoop/CarbonInputFormat.java | 2 +- .../hadoop/api/CarbonTableInputFormat.java | 4 +- .../streaming/CarbonStreamRecordReader.java | 10 +- .../streaming/CarbonStreamRecordWriter.java | 4 +- .../hadoop/util/CarbonInputFormatUtil.java | 6 +- .../hadoop/test/util/StoreCreator.java | 4 +- .../presto/impl/CarbonTableReader.java | 2 +- .../presto/util/CarbonDataStoreCreator.scala | 4 +- .../TestPreAggregateTableSelection.scala | 2 +- .../partition/TestDDLForPartitionTable.scala | 6 +- ...ForPartitionTableWithDefaultProperties.scala | 8 +- .../carbondata/spark/load/ValidateUtil.scala | 4 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +- .../carbondata/spark/rdd/PartitionDropper.scala | 2 +- .../spark/rdd/PartitionSplitter.scala | 2 +- .../carbondata/spark/util/CommonUtil.scala | 32 +---- .../carbondata/spark/util/DataLoadingUtil.scala | 8 +- .../spark/util/GlobalDictionaryUtil.scala | 12 +- .../command/carbonTableSchemaCommon.scala | 4 +- .../spark/rdd/CarbonDataRDDFactory.scala | 12 +- .../carbondata/spark/util/CarbonSparkUtil.scala | 18 ++- .../spark/sql/CarbonDataFrameWriter.scala | 3 +- .../sql/CarbonDatasourceHadoopRelation.scala | 4 +- .../spark/sql/CarbonDictionaryDecoder.scala | 16 +-- .../scala/org/apache/spark/sql/CarbonEnv.scala | 36 ++++- .../scala/org/apache/spark/sql/CarbonScan.scala | 6 +- .../org/apache/spark/sql/CarbonSource.scala | 20 +-- .../command/CarbonCreateTableCommand.scala | 4 +- .../CarbonDescribeFormattedCommand.scala | 20 +-- .../command/CarbonDropTableCommand.scala | 9 +- .../datamap/CarbonDataMapShowCommand.scala | 4 +- .../datamap/CarbonDropDataMapCommand.scala | 31 +++-- .../AlterTableCompactionCommand.scala | 12 +- .../management/CarbonShowLoadsCommand.scala | 4 +- .../command/management/CleanFilesCommand.scala | 10 +- .../management/DeleteLoadByIdCommand.scala | 4 +- .../DeleteLoadByLoadDateCommand.scala | 4 +- .../management/LoadTableByInsertCommand.scala | 2 +- .../command/management/LoadTableCommand.scala | 62 ++++----- .../command/mutation/DeleteExecution.scala | 13 +- .../command/mutation/HorizontalCompaction.scala | 8 +- .../command/mutation/IUDCommonUtil.scala | 2 +- .../mutation/ProjectForDeleteCommand.scala | 7 +- .../mutation/ProjectForUpdateCommand.scala | 11 +- .../AlterTableDropCarbonPartitionCommand.scala | 19 +-- .../AlterTableSplitCarbonPartitionCommand.scala | 19 +-- .../partition/ShowCarbonPartitionsCommand.scala | 7 +- .../CreatePreAggregateTableCommand.scala | 7 +- .../preaaggregate/PreAggregateListeners.scala | 6 +- .../preaaggregate/PreAggregateUtil.scala | 37 +++--- .../CarbonAlterTableAddColumnCommand.scala | 4 +- .../CarbonAlterTableDataTypeChangeCommand.scala | 4 +- .../CarbonAlterTableDropColumnCommand.scala | 4 +- .../schema/CarbonAlterTableRenameCommand.scala | 7 +- .../strategy/CarbonLateDecodeStrategy.scala | 4 +- .../sql/execution/strategy/DDLStrategy.scala | 11 +- .../strategy/StreamingTableStrategy.scala | 3 +- .../spark/sql/hive/CarbonFileMetastore.scala | 61 ++++----- .../spark/sql/hive/CarbonHiveMetaStore.scala | 13 +- .../apache/spark/sql/hive/CarbonMetaStore.scala | 4 +- .../sql/hive/CarbonPreAggregateRules.scala | 2 +- .../apache/spark/sql/hive/CarbonRelation.scala | 26 ++-- .../spark/sql/hive/CarbonSessionState.scala | 13 +- .../execution/command/CarbonHiveCommands.scala | 4 +- .../org/apache/spark/util/AlterTableUtil.scala | 36 ++--- .../org/apache/spark/util/CleanFiles.scala | 5 +- .../apache/spark/util/DeleteSegmentByDate.scala | 5 +- .../apache/spark/util/DeleteSegmentById.scala | 4 +- .../partition/TestAlterPartitionTable.scala | 32 ++--- .../spark/util/AllDictionaryTestCase.scala | 16 +-- .../spark/util/DictionaryTestCaseUtil.scala | 6 +- .../util/ExternalColumnDictionaryTestCase.scala | 16 +-- .../loading/DataLoadProcessBuilder.java | 6 +- .../merger/CarbonCompactionExecutor.java | 4 +- .../processing/merger/CarbonCompactionUtil.java | 4 +- .../processing/merger/CarbonDataMergerUtil.java | 8 +- .../carbondata/processing/merger/TableMeta.java | 42 ------ .../spliter/AbstractCarbonQueryExecutor.java | 4 +- .../partition/spliter/RowResultProcessor.java | 2 +- .../store/CarbonFactDataHandlerColumnar.java | 130 ------------------- .../processing/store/file/FileData.java | 52 -------- .../processing/store/file/FileManager.java | 59 --------- .../store/file/IFileManagerComposite.java | 57 -------- .../store/writer/AbstractFactDataWriter.java | 4 - .../store/writer/CarbonDataWriterVo.java | 65 ---------- .../util/CarbonDataProcessorUtil.java | 2 +- .../processing/util/CarbonLoaderUtil.java | 5 + .../carbondata/processing/StoreCreator.java | 4 +- .../streaming/segment/StreamSegment.java | 16 +-- .../streaming/StreamSinkFactory.scala | 2 +- .../CarbonStreamingQueryListener.scala | 6 +- 102 files changed, 423 insertions(+), 911 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java index a6c89e0..f8d2495 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java @@ -112,7 +112,7 @@ public class ManageDictionaryAndBTree { } // clear dictionary cache from LRU cache List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); for (CarbonDimension dimension : dimensions) { removeDictionaryColumnFromCache(carbonTable.getAbsoluteTableIdentifier(), dimension.getColumnId()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java index 75fe78b..2face7c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java @@ -120,7 +120,7 @@ public final class CarbonMetadata { public CarbonDimension getCarbonDimensionBasedOnColIdentifier(CarbonTable carbonTable, String columnIdentifier) { List<CarbonDimension> listOfCarbonDims = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); for (CarbonDimension dimension : listOfCarbonDims) { if (dimension.getColumnId().equals(columnIdentifier)) { return dimension; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index f76ddc9..ac580cd 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -347,7 +347,7 @@ public class CarbonTable implements Serializable { /** * @return the tabelName */ - public String getFactTableName() { + public String getTableName() { return absoluteTableIdentifier.getCarbonTableIdentifier().getTableName(); } @@ -569,7 +569,7 @@ public class CarbonTable implements Serializable { } public boolean isPartitionTable() { - return null != tablePartitionMap.get(getFactTableName()); + return null != tablePartitionMap.get(getTableName()); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index 29cf62a..0b531dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -208,7 +208,7 @@ public class CarbonUpdateUtil { lockStatus = carbonLock.lockWithRetries(); if (lockStatus) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName() + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = @@ -257,18 +257,18 @@ public class CarbonUpdateUtil { status = true; } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getTableName()); } } finally { if (lockStatus) { if (carbonLock.unlock()) { LOGGER.info( "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); + + "." + table.getTableName()); } else { LOGGER.error( "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); + .getTableName() + " during table status updation"); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index 66dfa61..67b8681 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -122,7 +122,7 @@ public class QueryModel implements Serializable { public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier, CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) { QueryModel queryModel = new QueryModel(); - String factTableName = carbonTable.getFactTableName(); + String factTableName = carbonTable.getTableName(); queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier); fillQueryModel(queryPlan, carbonTable, queryModel, factTableName); @@ -141,7 +141,7 @@ public class QueryModel implements Serializable { if (null != queryPlan.getFilterExpression()) { boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; boolean[] isFilterMeasures = - new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())]; + new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())]; processFilterExpression(queryPlan.getFilterExpression(), carbonTable.getDimensionByTableName(factTableName), carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 678a6f7..436950b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -482,6 +482,13 @@ public final class CarbonProperties { } /** + * Return the store path + */ + public static String getStorePath() { + return getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION); + } + + /** * This method will be used to get the properties value * * @param key http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java index 0de160a..5361fb0 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -94,7 +93,7 @@ public class CarbonMetadataTest { @Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() { String expectedResult = "carbonTestTable"; - assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getFactTableName()); + assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName()); } @Test public void testGetCarbonTableReturingProperTableWithProperTableUniqueName() { @@ -171,7 +170,7 @@ public class CarbonMetadataTest { carbonDimensions.add(new CarbonDimension(colSchema1, 1, 1, 2, 1)); carbonDimensions.add(new CarbonDimension(colSchema2, 2, 2, 2, 2)); new MockUp<CarbonTable>() { - @Mock public String getFactTableName() { + @Mock public String getTableName() { return "carbonTestTable"; } @@ -200,7 +199,7 @@ public class CarbonMetadataTest { colSchema2.setColumnUniqueId("2"); carbonChildDimensions.add(new CarbonDimension(colSchema3, 1, 1, 2, 1)); new MockUp<CarbonTable>() { - @Mock public String getFactTableName() { + @Mock public String getTableName() { return "carbonTestTable"; } @@ -242,7 +241,7 @@ public class CarbonMetadataTest { carbonChildDimensions.add(new CarbonDimension(colSchema2, 1, 1, 2, 1)); new MockUp<CarbonTable>() { - @Mock public String getFactTableName() { + @Mock public String getTableName() { return "carbonTestTable"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java index 8b66233..a47b7fd 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; @@ -57,7 +56,7 @@ public class CarbonTableTest extends TestCase { } @Test public void testFactTableNameReturnsProperFactTableName() { - assertEquals("carbonTestTable", carbonTable.getFactTableName()); + assertEquals("carbonTestTable", carbonTable.getTableName()); } @Test public void testTableUniqueNameIsProper() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java index e9caf4a..84312cd 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java @@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase { } @Test public void testFactTableNameReturnsProperFactTableName() { - assertEquals("carbonTestTable", carbonTable.getFactTableName()); + assertEquals("carbonTestTable", carbonTable.getTableName()); } @Test public void testTableUniqueNameIsProper() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala index 4b59aad..43d545d 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala @@ -89,9 +89,7 @@ object StreamExample { | """.stripMargin) } - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. - lookupRelation(Some("default"), streamTableName)(spark).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // batch load val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 0aa2974..88d8341 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -364,7 +364,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { TableProvider tableProvider = new SingleTableProvider(carbonTable); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); BitSet matchedPartitions = null; - PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); if (partitionInfo != null) { // prune partitions for filter query on partition table matchedPartitions = setMatchedPartitions(null, carbonTable, filter, partitionInfo); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 6e840e2..552455a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -393,7 +393,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { Expression filter = getFilterPredicates(job.getConfiguration()); TableProvider tableProvider = new SingleTableProvider(carbonTable); // this will be null in case of corrupt schema file. - PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); // prune partitions for filter query on partition table @@ -787,7 +787,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { Expression filter = getFilterPredicates(configuration); boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; boolean[] isFilterMeasures = - new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())]; + new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())]; CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions, isFilterMeasures); queryModel.setIsFilterDimensions(isFilterDimensions); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java index a22461d..bdd7c28 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java @@ -153,13 +153,13 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { } carbonTable = model.getTable(); List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); dimensionCount = dimensions.size(); List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); measureCount = measures.size(); List<CarbonColumn> carbonColumnList = - carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName()); + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; @@ -224,8 +224,8 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { private void initializeFilter() { List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()), - carbonTable.getMeasureByTableName(carbonTable.getFactTableName())); + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; for (int i = 0; i < dimLensWithComplex.length; i++) { dimLensWithComplex[i] = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java index 7df87e3..fdd0504 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java @@ -251,8 +251,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { private void writeFileHeader() throws IOException { List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()), - carbonTable.getMeasureByTableName(carbonTable.getFactTableName())); + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; for (int i = 0; i < dimLensWithComplex.length; i++) { dimLensWithComplex[i] = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 630828a..3afad94 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -52,7 +52,7 @@ public class CarbonInputFormatUtil { if (columnString != null) { columns = columnString.split(","); } - String factTableName = carbonTable.getFactTableName(); + String factTableName = carbonTable.getTableName(); CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), factTableName); // fill dimensions // If columns are null, set all dimensions and measures @@ -120,9 +120,9 @@ public class CarbonInputFormatUtil { public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable, boolean[] isFilterDimensions, boolean[] isFilterMeasures) { List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + carbonTable.getMeasureByTableName(carbonTable.getTableName()); QueryModel.processFilterExpression(filterExpression, dimensions, measures, isFilterDimensions, isFilterMeasures); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index b4145ef..c45f910 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -309,9 +309,9 @@ public class StoreCreator { String header = reader.readLine(); String[] split = header.split(","); List<CarbonColumn> allCols = new ArrayList<CarbonColumn>(); - List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName()); + List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName()); allCols.addAll(dims); - List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName()); + List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName()); allCols.addAll(msrs); Set<String>[] set = new HashSet[dims.size()]; for (int i = 0; i < set.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 8e6abd4..f72bb7a 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -363,7 +363,7 @@ public class CarbonTableReader { .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), null).getPath(); config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName()); - config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getFactTableName()); + config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName()); try { CarbonTableInputFormat.setTableInfo(config, tableInfo); http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 17a4188..1430baf 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -333,10 +333,10 @@ object CarbonDataStoreCreator { val split: Array[String] = header.split(",") val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]() val dims: util.List[CarbonDimension] = - table.getDimensionByTableName(table.getFactTableName) + table.getDimensionByTableName(table.getTableName) allCols.addAll(dims) val msrs: List[CarbonMeasure] = - table.getMeasureByTableName(table.getFactTableName) + table.getMeasureByTableName(table.getTableName) allCols.addAll(msrs) val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size) for (i <- set.indices) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 6b435c6..1d41664 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -147,7 +147,7 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { case logicalRelation:LogicalRelation => if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) { + if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { isValidPlan = true } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala index 3f99922..df1bd2e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala @@ -51,7 +51,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll { """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT) @@ -74,7 +74,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll { """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP) @@ -101,7 +101,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll { | 'LIST_INFO'='0, 1, (2, 3)') """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.STRING) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala index 317e2e2..c17ca00 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala @@ -45,7 +45,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT) @@ -68,7 +68,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP) @@ -96,7 +96,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo | 'DICTIONARY_INCLUDE'='projectenddate') """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP) @@ -128,7 +128,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo | 'LIST_INFO'='2017-06-11 , 2017-06-13') """.stripMargin) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTableDate") - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) assert(partitionInfo != null) assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate")) assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.DATE) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala index 8eb5101..51e0cc4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala @@ -51,8 +51,8 @@ object ValidateUtil { def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = { if (sortScope != null) { // Don't support use global sort on partitioned table. - if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null && - sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { + if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null && + sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " + "table.") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 5c6760a..37ab8c3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -46,7 +46,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, val oldPartitionIds = alterPartitionModel.oldPartitionIds val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val databaseName = carbonTable.getDatabaseName - val factTableName = carbonTable.getFactTableName + val factTableName = carbonTable.getTableName val partitionInfo = carbonTable.getPartitionInfo(factTableName) override protected def getPartitions: Array[Partition] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 9ca21bc..0fed5a7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -548,7 +548,7 @@ class PartitionTableDataLoaderRDD[K, V]( val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala index a82ea00..2aa5610 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala @@ -41,7 +41,7 @@ object PartitionDropper { val dropWithData = dropPartitionCallableModel.dropWithData val carbonTable = dropPartitionCallableModel.carbonTable val dbName = carbonTable.getDatabaseName - val tableName = carbonTable.getFactTableName + val tableName = carbonTable.getTableName val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val partitionInfo = carbonTable.getPartitionInfo(tableName) val partitioner = PartitionFactory.getPartitioner(partitionInfo) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala index db664b3..9106cca 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala @@ -40,7 +40,7 @@ object PartitionSplitter { val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val tableName = carbonTable.getFactTableName + val tableName = carbonTable.getTableName val databaseName = carbonTable.getDatabaseName val bucketInfo = carbonTable.getBucketingInfo(tableName) var finalSplitStatus = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 1b21e3d..a3572ed 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -55,7 +55,6 @@ import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD @@ -516,7 +515,6 @@ object CommonUtil { } def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel, - storePath: String, insertOverwrite: Boolean): Unit = { val newLoadMetaEntry = new LoadMetadataDetails val status = if (insertOverwrite) { @@ -528,16 +526,13 @@ object CommonUtil { // reading the start time of data load. val loadStartTime = CarbonUpdateUtil.readCurrentTime model.setFactTimeStamp(loadStartTime) - CarbonLoaderUtil - .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp, false) + CarbonLoaderUtil.populateNewLoadMetaEntry( + newLoadMetaEntry, status, model.getFactTimeStamp, false) val entryAdded: Boolean = CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite) if (!entryAdded) { - sys - .error(s"Failed to add entry in table status for ${ model.getDatabaseName }.${ - model - .getTableName - }") + sys.error(s"Failed to add entry in table status for " + + s"${ model.getDatabaseName }.${model.getTableName}") } } @@ -856,26 +851,9 @@ object CommonUtil { CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath, - carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath, + carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath, segmentIds).collect() } } - /** - * can be removed with the spark 1.6 removal - * @param tableMeta - * @return - */ - @deprecated - def getTablePath(tableMeta: TableMeta): String = { - if (tableMeta.tablePath == null) { - tableMeta.storePath + CarbonCommonConstants.FILE_SEPARATOR + - tableMeta.carbonTableIdentifier.getDatabaseName + - CarbonCommonConstants.FILE_SEPARATOR + tableMeta.carbonTableIdentifier.getTableName - } - else { - tableMeta.tablePath - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 35e1e78..84ad85e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -180,10 +180,10 @@ object DataLoadingUtil { options: immutable.Map[String, String], optionsFinal: mutable.Map[String, String], carbonLoadModel: CarbonLoadModel): Unit = { - carbonLoadModel.setTableName(table.getFactTableName) + carbonLoadModel.setTableName(table.getTableName) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTablePath(table.getTablePath) - carbonLoadModel.setTableName(table.getFactTableName) + carbonLoadModel.setTableName(table.getTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) @@ -199,7 +199,7 @@ object DataLoadingUtil { val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2") val all_dictionary_path = optionsFinal("all_dictionary_path") val column_dict = optionsFinal("columndict") - ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName) + ValidateUtil.validateDateFormat(dateFormat, table, table.getTableName) ValidateUtil.validateSortScope(table, sort_scope) if (bad_records_logger_enable.toBoolean || @@ -236,7 +236,7 @@ object DataLoadingUtil { } } else { if (fileHeader.isEmpty) { - fileHeader = table.getCreateOrderColumn(table.getFactTableName) + fileHeader = table.getCreateOrderColumn(table.getTableName) .asScala.map(_.getColName).mkString(",") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 975fc9b..0bf2b16 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -676,17 +676,17 @@ object GlobalDictionaryUtil { */ def generateGlobalDictionary(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - storePath: String, + tablePath: String, dataFrame: Option[DataFrame] = None): Unit = { try { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier // create dictionary folder if not exists - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) val dictfolderPath = carbonTablePath.getMetadataDirectoryPath // columns which need to generate global dictionary file val dimensions = carbonTable.getDimensionByTableName( - carbonTable.getFactTableName).asScala.toArray + carbonTable.getTableName).asScala.toArray // generate global dict from pre defined column dict file carbonLoadModel.initPredefDictMap() @@ -701,7 +701,7 @@ object GlobalDictionaryUtil { if (colDictFilePath != null) { // generate predefined dictionary generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, - dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath) + dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath) } if (headers.length > df.columns.length) { val msg = "The number of columns in the file header do not match the " + @@ -717,7 +717,7 @@ object GlobalDictionaryUtil { // select column to push down pruning df = df.select(requireColumnNames.head, requireColumnNames.tail: _*) val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, - requireDimension, storePath, dictfolderPath, false) + requireDimension, tablePath, dictfolderPath, false) // combine distinct value in a block and partition by column val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model) .partitionBy(new ColumnPartitioner(model.primDimensions.length)) @@ -731,7 +731,7 @@ object GlobalDictionaryUtil { } else { generateDictionaryFromDictionaryFiles(sqlContext, carbonLoadModel, - storePath, + tablePath, carbonTableIdentifier, dictfolderPath, dimensions, http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 756de6b..2f6b277 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -454,7 +454,7 @@ class TableNewProcessor(cm: TableModel) { val field = cm.dimCols.find(keyDim equals _.column).get val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) { cm.parentTable.get.getColumnByName( - cm.parentTable.get.getFactTableName, + cm.parentTable.get.getTableName, cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder } else { val encoders = new java.util.ArrayList[Encoding]() @@ -479,7 +479,7 @@ class TableNewProcessor(cm: TableModel) { val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) { cm.parentTable.get.getColumnByName( - cm.parentTable.get.getFactTableName, + cm.parentTable.get.getTableName, cm.dataMapRelation.get.get(field).get. columnTableRelation.get.parentColumnName).getEncoder } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1ca7456..c12d2ef 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -179,7 +179,7 @@ object CarbonDataRDDFactory { while (null != tableForCompaction) { LOGGER.info("Compaction request has been identified for table " + s"${ tableForCompaction.getDatabaseName }." + - s"${ tableForCompaction.getFactTableName}") + s"${ tableForCompaction.getTableName}") val table: CarbonTable = tableForCompaction val metadataPath = table.getMetaDataFilepath val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) @@ -204,7 +204,7 @@ object CarbonDataRDDFactory { case e: Exception => LOGGER.error("Exception in compaction thread for table " + s"${ tableForCompaction.getDatabaseName }." + - s"${ tableForCompaction.getFactTableName }") + s"${ tableForCompaction.getTableName }") // not handling the exception. only logging as this is not the table triggered // by user. } finally { @@ -216,7 +216,7 @@ object CarbonDataRDDFactory { skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier) LOGGER.error("Compaction request file can not be deleted for table " + s"${ tableForCompaction.getDatabaseName }." + - s"${ tableForCompaction.getFactTableName }") + s"${ tableForCompaction.getTableName }") } } // ********* check again for all the tables. @@ -248,7 +248,7 @@ object CarbonDataRDDFactory { table: CarbonTable ): CarbonLoadModel = { val loadModel = new CarbonLoadModel - loadModel.setTableName(table.getFactTableName) + loadModel.setTableName(table.getTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation loadModel.setCarbonDataLoadSchema(dataLoadSchema) @@ -319,7 +319,7 @@ object CarbonDataRDDFactory { } } } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel) } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext, @@ -782,7 +782,7 @@ object CarbonDataRDDFactory { dataFrame: Option[DataFrame], carbonLoadModel: CarbonLoadModel): RDD[Row] = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType val columns = carbonLoadModel.getCsvHeaderColumns http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index 1e6a36e..47f5344 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -21,23 +21,20 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.processing.merger.TableMeta case class TransformHolder(rdd: Any, mataData: CarbonMetaData) object CarbonSparkUtil { def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = { - val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName) + val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName) .asScala.map(x => x.getColName) // wf : may be problem - val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName) + val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName) .asScala.map(x => x.getColName) val dictionary = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f => + carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f => (f.getColName.toLowerCase, f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && !f.getDataType.isComplexType) @@ -47,10 +44,11 @@ object CarbonSparkUtil { def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = { val table = CarbonTable.buildFromTableInfo(tableInfo) - val meta = new TableMeta(table.getCarbonTableIdentifier, - table.getTablePath, tablePath, table) - CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName, - CarbonSparkUtil.createSparkMeta(table), meta) + CarbonRelation( + tableInfo.getDatabaseName, + tableInfo.getFactTable.getTableName, + CarbonSparkUtil.createSparkMeta(table), + table) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 44fbb37..b74576d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.types._ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonOption class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { @@ -58,7 +59,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { */ private def loadTempCSV(options: CarbonOption): Unit = { // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath + val storePath = CarbonProperties.getStorePath val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) .append("tempCSV") .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 22933f2..72f40ac 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -50,7 +50,7 @@ case class CarbonDatasourceHadoopRelation( lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head, parameters("dbname"), parameters("tablename")) lazy val databaseName: String = carbonTable.getDatabaseName - lazy val tableName: String = carbonTable.getFactTableName + lazy val tableName: String = carbonTable.getTableName CarbonSession.updateSessionInfoToCurrentThread(sparkSession) @transient lazy val carbonRelation: CarbonRelation = @@ -58,7 +58,7 @@ case class CarbonDatasourceHadoopRelation( createCarbonRelation(parameters, identifier, sparkSession) - @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable + @transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable override def sqlContext: SQLContext = sparkSession.sqlContext http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index c7db436..9d88c4c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -72,7 +72,7 @@ case class CarbonDictionaryDecoder( attachTree(this, "execute") { val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable - (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) + (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { @@ -125,7 +125,7 @@ case class CarbonDictionaryDecoder( val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable - (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) + (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { @@ -323,7 +323,7 @@ object CarbonDictionaryDecoder { if (relation.isDefined && canBeDecoded(attr, profile)) { val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable val carbonDimension = carbonTable - .getDimensionByName(carbonTable.getFactTableName, attr.name) + .getDimensionByName(carbonTable.getTableName, attr.name) if (carbonDimension != null && carbonDimension.hasEncoding(Encoding.DICTIONARY) && !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && @@ -355,7 +355,7 @@ object CarbonDictionaryDecoder { if (relation.isDefined) { val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable val carbonDimension = carbonTable - .getDimensionByName(carbonTable.getFactTableName, attr.name) + .getDimensionByName(carbonTable.getTableName, attr.name) if (carbonDimension != null && carbonDimension.hasEncoding(Encoding.DICTIONARY) && !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && @@ -432,12 +432,12 @@ object CarbonDictionaryDecoder { if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) { val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) + carbonTable.getDimensionByName(carbonTable.getTableName, attr.name) if (carbonDimension != null && carbonDimension.hasEncoding(Encoding.DICTIONARY) && !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonDimension.isComplex) { - (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, + (carbonTable.getTableName, carbonDimension.getColumnIdentifier, carbonDimension) } else { (null, null, null) @@ -485,12 +485,12 @@ class CarbonDecoderRDD( if (relation.isDefined && canBeDecoded(attr)) { val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) + carbonTable.getDimensionByName(carbonTable.getTableName, attr.name) if (carbonDimension != null && carbonDimension.hasEncoding(Encoding.DICTIONARY) && !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonDimension.isComplex()) { - (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, + (carbonTable.getTableName, carbonDimension.getColumnIdentifier, carbonDimension) } else { (null, null, null) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 1ee7650..dcfce0f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -20,13 +20,15 @@ package org.apache.spark.sql import java.util.Map import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.internal.CarbonSQLConf import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo} -import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationListenerBus} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -41,8 +43,6 @@ class CarbonEnv { var carbonSessionInfo: CarbonSessionInfo = _ - var storePath: String = _ - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // set readsupport class global so that the executor can get it. @@ -74,7 +74,7 @@ class CarbonEnv { config.addDefaultCarbonSessionParams() carbonMetastore = { val properties = CarbonProperties.getInstance() - storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION) + var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION) if (storePath == null) { storePath = sparkSession.conf.get("spark.sql.warehouse.dir") properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) @@ -112,6 +112,30 @@ object CarbonEnv { carbonEnv } } -} + /** + * Return carbon table instance by looking up relation in `sparkSession` + */ + def getCarbonTable( + databaseNameOp: Option[String], + tableName: String) + (sparkSession: SparkSession): CarbonTable = { + CarbonEnv + .getInstance(sparkSession) + .carbonMetastore + .lookupRelation(databaseNameOp, tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + } + def getCarbonTable( + tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): CarbonTable = { + CarbonEnv + .getInstance(sparkSession) + .carbonMetastore + .lookupRelation(tableIdentifier)(sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala index 0806421..99a7c37 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala @@ -65,11 +65,11 @@ case class CarbonScan( attributesRaw = attributeOut } - val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName) + val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName) val colAttr = new Array[Attribute](columns.size()) attributesRaw.foreach { attr => val column = - carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) + carbonTable.getColumnByName(carbonTable.getTableName, attr.name) if(column != null) { colAttr(columns.indexOf(column)) = attr } @@ -78,7 +78,7 @@ case class CarbonScan( var queryOrder: Integer = 0 attributesRaw.foreach { attr => - val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) + val carbonColumn = carbonTable.getColumnByName(carbonTable.getTableName, attr.name) if (carbonColumn != null) { if (carbonColumn.isDimension()) { val dim = new QueryDimension(attr.name) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index fba590e..6331f12 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -165,7 +165,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } else { CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession) - (CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName", parameters) + (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters) } } catch { case ex: NoSuchTableException => @@ -199,11 +199,10 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (parameters.contains("tablePath")) { (parameters("tablePath"), parameters) } else if (!sparkSession.isInstanceOf[CarbonSession]) { - (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName, parameters) + (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters) } else { - val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - (relation.tableMeta.tablePath, parameters) + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + (carbonTable.getTablePath, parameters) } } catch { case ex: Exception => @@ -235,15 +234,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } if (tablePathOption.isDefined) { val sparkSession = sqlContext.sparkSession - val identifier: AbsoluteTableIdentifier = - AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName) - val carbonTable = - CarbonEnv.getInstance(sparkSession).carbonMetastore. - createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) if (!carbonTable.isStreamingTable) { throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." + - s"${carbonTable.getFactTableName} is not a streaming table") + s"${carbonTable.getTableName} is not a streaming table") } // create sink @@ -314,8 +309,7 @@ object CarbonSource { val tableName: String = properties.getOrElse("tableName", "").toLowerCase val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName) val tableInfo: TableInfo = TableNewProcessor(model) - val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, - CarbonEnv.getInstance(sparkSession).storePath) + val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath) val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName val schemaEvolutionEntry = new SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala index 197b23b..f83766d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala @@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} case class CarbonCreateTableCommand( cm: TableModel, @@ -37,7 +37,7 @@ case class CarbonCreateTableCommand( } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath + val storePath = CarbonProperties.getStorePath CarbonEnv.getInstance(sparkSession).carbonMetastore. checkSchemasModifiedTimeAndReloadTables() val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala index 7dcad9a..b233c99 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala @@ -29,6 +29,7 @@ import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.util.CarbonProperties private[sql] case class CarbonDescribeFormattedCommand( child: SparkPlan, @@ -68,7 +69,7 @@ private[sql] case class CarbonDescribeFormattedCommand( val colComment = field.getComment().getOrElse("null") val comment = if (dims.contains(fieldName)) { val dimension = relation.metaData.carbonTable.getDimensionByName( - relation.tableMeta.carbonTableIdentifier.getTableName, fieldName) + relation.carbonTable.getTableName, fieldName) if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) { colProps.append(fieldName).append(".") .append(mapper.writeValueAsString(dimension.getColumnProperties)) @@ -101,12 +102,11 @@ private[sql] case class CarbonDescribeFormattedCommand( colProps.toString() } results ++= Seq(("", "", ""), ("##Detailed Table Information", "", "")) - results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier - .getDatabaseName, "") + results ++= Seq(("Database Name: ", relation.carbonTable.getDatabaseName, "") ) - results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) - results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) - val carbonTable = relation.tableMeta.carbonTable + results ++= Seq(("Table Name: ", relation.carbonTable.getTableName, "")) + results ++= Seq(("CARBON Store Path: ", CarbonProperties.getStorePath, "")) + val carbonTable = relation.carbonTable // Carbon table support table comment val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "") @@ -122,14 +122,14 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("ADAPTIVE", "", "")) } results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( - relation.tableMeta.carbonTableIdentifier.getTableName).asScala + relation.carbonTable.getTableName).asScala .map(column => column).mkString(","), "")) val dimension = carbonTable - .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) + .getDimensionByTableName(relation.carbonTable.getTableName) results ++= getColumnGroups(dimension.asScala.toList) - if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { + if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { results ++= - Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) + Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getTableName) .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) } results.map { http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala index 0343393..f0a916a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala @@ -30,7 +30,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events._ @@ -50,12 +50,11 @@ case class CarbonDropTableCommand( val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession) val identifier = TableIdentifier(tableName, Option(dbName)) - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetastore val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, - CarbonEnv.getInstance(sparkSession).storePath) + CarbonProperties.getStorePath) val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase val absoluteTableIdentifier = AbsoluteTableIdentifier .from(tablePath, dbName.toLowerCase, tableName.toLowerCase) @@ -68,7 +67,7 @@ case class CarbonDropTableCommand( LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") val carbonTable: Option[CarbonTable] = catalog.getTableFromMetadataCache(dbName, tableName) match { - case Some(tableMeta) => Some(tableMeta.carbonTable) + case Some(carbonTable) => Some(carbonTable) case None => try { Some(catalog.lookupRelation(identifier)(sparkSession) .asInstanceOf[CarbonRelation].metaData.carbonTable) @@ -131,7 +130,7 @@ case class CarbonDropTableCommand( // delete the table folder val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession) val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, - CarbonEnv.getInstance(sparkSession).storePath) + CarbonProperties.getStorePath) val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) val metadataFilePath =