[CARBONDATA-2986] Table Properties are lost when multiple driver concurrently
Issue :- When concurrently multiple driver is creating table , for same table table properties are lost . Root Cause :-Schema file is getting overwritten from CarbonRelation#createTableIfNotExists,because lookup of table is failed . this is happpened because concurrenly .mdt file is updated and current table is removed from cache org.apache.spark.sql.hive.CarbonFileMetastore#checkSchemasModifiedTimeAndReloadTable Solution :- Since carbon table is already created and Schema file is already written so no need to do lookup again . This closes #2785 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3edea12a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3edea12a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3edea12a Branch: refs/heads/branch-1.5 Commit: 3edea12a83e70dddb1eca271bf5660f73de272f5 Parents: 11bd0ad Author: BJangir <babulaljangir...@gmail.com> Authored: Fri Sep 28 17:17:30 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Oct 4 18:05:06 2018 +0530 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/CarbonSource.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3edea12a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 16cee96..cd1087d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -57,6 +57,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider with SchemaRelationProvider with StreamSinkProvider with DataSourceRegister { override def shortName(): String = "carbondata" + private val LOGGER = LogServiceFactory.getLogService(CarbonSource.getClass.getName) // will be called if hive supported create table command is provided override def createRelation(sqlContext: SQLContext, @@ -143,7 +144,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider .exists(_.table.equalsIgnoreCase(tableName))) { getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters) } else { - createTableIfNotExists(sqlContext.sparkSession, newParameters, dataSchema) + createTableIfNotExists(sqlContext.sparkSession, dbName, tableName, newParameters, dataSchema) } CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams, @@ -160,6 +161,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider private def createTableIfNotExists( sparkSession: SparkSession, + dbName: String, + tableName: String, parameters: Map[String, String], dataSchema: StructType): (String, Map[String, String]) = { @@ -167,10 +170,18 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider val tableName: String = parameters.getOrElse("tableName", "").toLowerCase try { - val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - (carbonTable.getTablePath, parameters) + if (!(parameters.contains("carbonSchemaPartsNo") + || parameters.contains("carbonschemapartsno"))) { + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + (carbonTable.getTablePath, parameters) + } else { + (getPathForTable(sparkSession, dbName, tableName, parameters)) + } + } catch { case _: NoSuchTableException => + LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " + + "Now existing Schema will be overwritten with default properties") val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val identifier = AbsoluteTableIdentifier.from( CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),