Repository: incubator-carbondata Updated Branches: refs/heads/master ecf29472e -> 526243b09
fixLoadTableForSpark2 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1b5e7fb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1b5e7fb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1b5e7fb4 Branch: refs/heads/master Commit: 1b5e7fb442dd99b859b819fad5dea8cbc754e4c2 Parents: ecf2947 Author: QiangCai <qiang...@qq.com> Authored: Fri Dec 16 00:26:09 2016 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Fri Dec 16 00:46:54 2016 +0800 ---------------------------------------------------------------------- .../execution/command/carbonTableSchema.scala | 46 +++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b5e7fb4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 7f74d92..10fffd9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation} import org.apache.spark.sql.types.TimestampType import org.apache.spark.util.FileUtils @@ -34,6 +34,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.CarbonDataLoadSchema import org.apache.carbondata.core.carbon.metadata.CarbonMetadata import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} @@ -43,7 +45,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc} +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel} import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil} /** @@ -259,6 +261,45 @@ case class DeleteLoadsByLoadDate( } +object LoadTable { + + def updateTableMetadata(carbonLoadModel: CarbonLoadModel, + sqlContext: SQLContext, + model: DictionaryLoadModel, + noDictDimension: Array[CarbonDimension]): Unit = { + + val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, + model.table) + val schemaFilePath = carbonTablePath.getSchemaFilePath + + // read TableInfo + val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) + + // modify TableInfo + val columns = tableInfo.getFact_table.getTable_columns + for (i <- 0 until columns.size) { + if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { + columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) + } + } + + // write TableInfo + CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) + + // update Metadata + val catalog = CarbonEnv.get.carbonMetastore + catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, + model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) + + // update CarbonDataLoadSchema + val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName), + model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta + .carbonTable + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + } + +} + case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan) { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) def run(sparkSession: SparkSession): Seq[Row] = { @@ -436,6 +477,7 @@ case class LoadTable( carbonLoadModel.setCsvHeader(fileHeader) carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) + GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata GlobalDictionaryUtil .generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, dataFrame)