Indhumathi27 commented on a change in pull request #3688: [CARBONDATA-3765] 
Refactor Index Metadata for CG and FG Indexes
URL: https://github.com/apache/carbondata/pull/3688#discussion_r406576162
 
 

 ##########
 File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
 ##########
 @@ -67,128 +82,195 @@ case class CarbonCreateIndexCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
-    if 
(DataMapStoreManager.getInstance().isDataMapExist(parentTable.getTableId, 
indexName)) {
-      if (!ifNotExistsSet) {
-        throw new MalformedIndexCommandException(
-          s"Index with name ${ indexName } on table " +
-            s"${parentTable.getDatabaseName}.${parentTable.getTableName} 
already exists")
-      } else {
-        return Seq.empty
-      }
+    if (parentTable.isMVTable || parentTable.isIndexTable) {
+      throw new MalformedIndexCommandException(
+        "Cannot create index on child table `" + indexName + "`")
     }
 
     if (CarbonUtil.getFormatVersion(parentTable) != ColumnarFormatVersion.V3) {
-      throw new MalformedCarbonCommandException(s"Unsupported operation on 
table with " +
-                                                s"V1 or V2 format data")
+      throw new MalformedCarbonCommandException(
+        s"Unsupported operation on table with V1 or V2 format data")
     }
 
-    dataMapSchema = new DataMapSchema(indexName, indexProviderName)
+    // get metadata lock to avoid concurrent create index operations
+    val metadataLock = CarbonLockFactory.getCarbonLockObj(
+      parentTable.getAbsoluteTableIdentifier,
+      LockUsage.METADATA_LOCK)
 
-    val property = properties.map(x => (x._1.trim, x._2.trim)).asJava
-    val javaMap = new java.util.HashMap[String, String](property)
-    javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
-    javaMap.put(CarbonCommonConstants.INDEX_COLUMNS, 
indexModel.columnNames.mkString(","))
-    dataMapSchema.setProperties(javaMap)
+    try {
+      if (metadataLock.lockWithRetries()) {
+        LOGGER.info(s"Acquired the metadata lock for table 
$dbName.$parentTableName")
+        // get carbon table again to reflect any changes during lock acquire.
+        parentTable =
+          CarbonEnv.getInstance(sparkSession).carbonMetaStore
+            .lookupRelation(Some(dbName), parentTableName)(sparkSession)
+            .asInstanceOf[CarbonRelation].carbonTable
+        if (parentTable == null) {
+          throw new MalformedIndexCommandException(errMsg)
+        }
+        val oldIndexMetaData = parentTable.getIndexMetadata
+        // check whether the column has index created already
+        if (null != oldIndexMetaData) {
+          val indexExistsInCarbon = 
oldIndexMetaData.getIndexTables.asScala.contains(indexName)
+          if (indexExistsInCarbon) {
+            throw new MalformedIndexCommandException(
+              "Index with name `" + indexName + "` already exists on table `" 
+ parentTableName +
+              "`")
+          }
+        }
+        // set properties
+        indexSchema.setProperties(indexProperties)
+        provider = new IndexProvider(parentTable, indexSchema, sparkSession)
 
-    if (dataMapSchema.isIndex && parentTable == null) {
-      throw new MalformedIndexCommandException(
-        "To create index, main table is required. Use `CREATE INDEX ... ON 
TABLE ...` ")
-    }
-    provider = new IndexProvider(parentTable, dataMapSchema, sparkSession)
-    if (deferredRebuild && !provider.supportRebuild()) {
-      throw new MalformedIndexCommandException(
-        s"DEFERRED REFRESH is not supported on this index $indexName" +
-        s" with provider ${dataMapSchema.getProviderName}")
-    }
+        if (deferredRebuild && !provider.supportRebuild()) {
+          throw new MalformedIndexCommandException(
+          "DEFERRED REFRESH is not supported on this index " + 
indexModel.indexName +
+          " with provider " + indexProviderName)
+        } else if (deferredRebuild && provider.supportRebuild()) {
+          indexProperties.put(CarbonCommonConstants.INDEX_STATUS, 
IndexStatus.DISABLED.name())
+        }
 
-    if (parentTable.isMVTable) {
-      throw new MalformedIndexCommandException(
-        "Cannot create index on MV table " + parentTable.getTableUniqueName)
-    }
+        val isBloomFilter = 
CarbonIndexProvider.BLOOMFILTER.getIndexProviderName
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to