[CARBONDATA-2771]block update and delete on table if the compaction is in 
progress & local dictionary issue for old table

Problem:
if update is triggered when comapction is in progress, in some cases there will 
be data miss match and sometimes update fails as the segmentId mapping which is 
used in getPartitions may differ, during update operation.
for older tables describe formatted was showing true for local dictionary enable

Solution
Block update and delete if compaction is in progress for table
for older tables, local dictionary property will be null, so describe formatted 
should show false for local dictionary enable

This closes #2541


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/00769570
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/00769570
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/00769570

Branch: refs/heads/branch-1.4
Commit: 007695703def706e3846671d7745001431507277
Parents: b6f1258
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Mon Jul 23 20:29:46 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 20 +++++++
 .../createTable/TestCreateExternalTable.scala   | 32 ++++++-----
 .../TestNonTransactionalCarbonTable.scala       | 12 +++++
 ...ransactionalCarbonTableWithComplexType.scala |  3 ++
 .../CarbonProjectForDeleteCommand.scala         |  4 ++
 .../CarbonProjectForUpdateCommand.scala         |  3 ++
 .../table/CarbonDescribeFormattedCommand.scala  | 57 ++++++++++----------
 .../org/apache/spark/util/AlterTableUtil.scala  | 12 ++++-
 8 files changed, 101 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 5c73259..d5b456c 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -770,6 +770,26 @@ public class SegmentStatusManager {
   }
 
   /**
+   * Return true if the compaction is in progress for the table
+   * @param carbonTable
+   * @return
+   */
+  public static Boolean isCompactionInProgress(CarbonTable carbonTable) {
+    if (carbonTable == null) {
+      return false;
+    }
+    boolean compactionInProgress;
+    ICarbonLock lock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), 
LockUsage.COMPACTION_LOCK);
+    try {
+      compactionInProgress = !lock.lockWithRetries(1, 0);
+    } finally {
+      lock.unlock();
+    }
+    return compactionInProgress;
+  }
+
+  /**
    * Return true if insert overwrite is in progress for specified table
    */
   public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
index 519089b..a9b8d57 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -46,9 +46,23 @@ class TestCreateExternalTable extends QueryTest with 
BeforeAndAfterAll {
   test("create external table with existing files") {
     assert(new File(originDataPath).exists())
     sql("DROP TABLE IF EXISTS source")
-    if (CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-        
CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("false"))
 {
+    if (System
+          .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+            
CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true")
 ||
+        CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+            
CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true"))
 {
+
+      intercept[Exception] {
+        // create external table with existing files
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE source
+             |STORED BY 'carbondata'
+             |LOCATION '$storeLocation/origin'
+       """.stripMargin)
+      }
+    } else {
 
       // create external table with existing files
       sql(
@@ -68,17 +82,7 @@ class TestCreateExternalTable extends QueryTest with 
BeforeAndAfterAll {
 
       // DROP TABLE should not delete data
       assert(new File(originDataPath).exists())
-    }
-    else {
-      intercept[Exception] {
-        // create external table with existing files
-        sql(
-          s"""
-             |CREATE EXTERNAL TABLE source
-             |STORED BY 'carbondata'
-             |LOCATION '$storeLocation/origin'
-       """.stripMargin)
-      }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index c7d9caa..62c3df6 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2329,12 +2329,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2355,12 +2358,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2381,12 +2387,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2411,12 +2420,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
 
     checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(1)))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 8a27d0d..7593f38 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -219,12 +219,15 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
     val descLoc = sql("describe formatted localComplex").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
       case Some(row) => 
assert(row.get(1).toString.contains("name,val1.val2.street,val1.val2.city,val1.val2.WindSpeed,val1.val2.year"))
+      case None => assert(false)
     }
 
     // TODO: Add a validation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f1fa9b3..0127d7e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -50,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
+    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "compaction", "data 
delete")
+    }
+
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data 
delete")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 964407f..4e9c1af 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -70,6 +70,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
+    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "compaction", "data 
update")
+    }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data 
update")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 9e1e0e4..41dfea5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -123,41 +123,44 @@ private[sql] case class CarbonDescribeFormattedCommand(
     }
 
     var isLocalDictEnabled = tblProps.asScala
-      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
-        CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
-    val localDictEnabled = isLocalDictEnabled.split(",") { 0 }
-    results ++= Seq(("Local Dictionary Enabled", localDictEnabled, ""))
-    // if local dictionary is enabled, then only show other properties of 
local dictionary
-    if (localDictEnabled.toBoolean) {
-      var localDictThreshold = tblProps.asScala
-        .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
-          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
-      val localDictionaryThreshold = localDictThreshold.split(",")
-      localDictThreshold = localDictionaryThreshold { 0 }
-      results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
-      val columns = 
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
-      val builder = new StringBuilder
-      columns.foreach { column =>
-        if (column.isLocalDictColumn && !column.isInvisible) {
-          builder.append(column.getColumnName).append(",")
-        }
-      }
-      results ++=
-      Seq(("Local Dictionary Include", 
getDictColumnString(builder.toString().split(",")), ""))
-      if (tblProps.asScala
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+      .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+    if (isLocalDictEnabled.isDefined) {
+      val localDictEnabled = isLocalDictEnabled.get.split(",") { 0 }
+      results ++= Seq(("Local Dictionary Enabled", localDictEnabled, ""))
+      // if local dictionary is enabled, then only show other properties of 
local dictionary
+      if (localDictEnabled.toBoolean) {
+        var localDictThreshold = tblProps.asScala
+          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+        val localDictionaryThreshold = localDictThreshold.split(",")
+        localDictThreshold = localDictionaryThreshold { 0 }
+        results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
         val columns = 
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
         val builder = new StringBuilder
         columns.foreach { column =>
-          if (!column.isLocalDictColumn && !column.isInvisible &&
-              (column.getDataType.equals(DataTypes.STRING) ||
-               column.getDataType.equals(DataTypes.VARCHAR))) {
+          if (column.isLocalDictColumn && !column.isInvisible) {
             builder.append(column.getColumnName).append(",")
           }
         }
         results ++=
-        Seq(("Local Dictionary Exclude", 
getDictColumnString(builder.toString().split(",")), ""))
+        Seq(("Local Dictionary Include", 
getDictColumnString(builder.toString().split(",")), ""))
+        if (tblProps.asScala
+          .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+          val columns = 
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+          val builder = new StringBuilder
+          columns.foreach { column =>
+            if (!column.isLocalDictColumn && !column.isInvisible &&
+                (column.getDataType.equals(DataTypes.STRING) ||
+                 column.getDataType.equals(DataTypes.VARCHAR))) {
+              builder.append(column.getColumnName).append(",")
+            }
+          }
+          results ++=
+          Seq(("Local Dictionary Exclude", 
getDictColumnString(builder.toString().split(",")), ""))
+        }
       }
+    } else {
+      results ++= Seq(("Local Dictionary Enabled", "false", ""))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 96b191f..cab9de5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -346,7 +346,17 @@ object AlterTableUtil {
         // since thriftTable also holds comment as its property.
         propKeys.foreach { propKey =>
           if (validateTableProperties(propKey)) {
-            tblPropertiesMap.remove(propKey.toLowerCase)
+            // This check is required because for old tables we need to keep 
same behavior for it,
+            // meaning, local dictionary should be disabled. To enable we can 
use set command for
+            // older tables. So no need to remove from table properties map 
for unset just to ensure
+            // for older table behavior. So in case of unset, if enable 
property is already present
+            // in map, then just set it to default value of local dictionary 
which is true.
+            if 
(!propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) {
+              tblPropertiesMap.remove(propKey.toLowerCase)
+            } else {
+              tblPropertiesMap
+                .put(propKey.toLowerCase, 
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+            }
           } else {
             val errorMessage = "Error: Invalid option(s): " + propKey
             throw new MalformedCarbonCommandException(errorMessage)

Reply via email to