[CARBONDATA-2771]block update and delete on table if the compaction is in progress & local dictionary issue for old table
Problem: if update is triggered when comapction is in progress, in some cases there will be data miss match and sometimes update fails as the segmentId mapping which is used in getPartitions may differ, during update operation. for older tables describe formatted was showing true for local dictionary enable Solution Block update and delete if compaction is in progress for table for older tables, local dictionary property will be null, so describe formatted should show false for local dictionary enable This closes #2541 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/00769570 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/00769570 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/00769570 Branch: refs/heads/branch-1.4 Commit: 007695703def706e3846671d7745001431507277 Parents: b6f1258 Author: akashrn5 <akashnilu...@gmail.com> Authored: Mon Jul 23 20:29:46 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jul 31 00:11:26 2018 +0530 ---------------------------------------------------------------------- .../statusmanager/SegmentStatusManager.java | 20 +++++++ .../createTable/TestCreateExternalTable.scala | 32 ++++++----- .../TestNonTransactionalCarbonTable.scala | 12 +++++ ...ransactionalCarbonTableWithComplexType.scala | 3 ++ .../CarbonProjectForDeleteCommand.scala | 4 ++ .../CarbonProjectForUpdateCommand.scala | 3 ++ .../table/CarbonDescribeFormattedCommand.scala | 57 ++++++++++---------- .../org/apache/spark/util/AlterTableUtil.scala | 12 ++++- 8 files changed, 101 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 5c73259..d5b456c 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -770,6 +770,26 @@ public class SegmentStatusManager { } /** + * Return true if the compaction is in progress for the table + * @param carbonTable + * @return + */ + public static Boolean isCompactionInProgress(CarbonTable carbonTable) { + if (carbonTable == null) { + return false; + } + boolean compactionInProgress; + ICarbonLock lock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.COMPACTION_LOCK); + try { + compactionInProgress = !lock.lockWithRetries(1, 0); + } finally { + lock.unlock(); + } + return compactionInProgress; + } + + /** * Return true if insert overwrite is in progress for specified table */ public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala index 519089b..a9b8d57 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala @@ -46,9 +46,23 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll { test("create external table with existing files") { assert(new File(originDataPath).exists()) sql("DROP TABLE IF EXISTS source") - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("false")) { + if (System + .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true") || + CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true")) { + + intercept[Exception] { + // create external table with existing files + sql( + s""" + |CREATE EXTERNAL TABLE source + |STORED BY 'carbondata' + |LOCATION '$storeLocation/origin' + """.stripMargin) + } + } else { // create external table with existing files sql( @@ -68,17 +82,7 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll { // DROP TABLE should not delete data assert(new File(originDataPath).exists()) - } - else { - intercept[Exception] { - // create external table with existing files - sql( - s""" - |CREATE EXTERNAL TABLE source - |STORED BY 'carbondata' - |LOCATION '$storeLocation/origin' - """.stripMargin) - } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index c7d9caa..62c3df6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -2329,12 +2329,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val descLoc = sql("describe formatted sdkTable").collect descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match { case Some(row) => assert(row.get(1).toString.contains("true")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match { case Some(row) => assert(row.get(1).toString.contains("10000")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match { case Some(row) => assert(row.get(1).toString.contains("name,surname")) + case None => assert(false) } FileUtils.deleteDirectory(new File(writerPath)) } @@ -2355,12 +2358,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val descLoc = sql("describe formatted sdkTable").collect descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match { case Some(row) => assert(row.get(1).toString.contains("true")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match { case Some(row) => assert(row.get(1).toString.contains("10000")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match { case Some(row) => assert(row.get(1).toString.contains("name,surname")) + case None => assert(false) } FileUtils.deleteDirectory(new File(writerPath)) } @@ -2381,12 +2387,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val descLoc = sql("describe formatted sdkTable").collect descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match { case Some(row) => assert(row.get(1).toString.contains("true")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match { case Some(row) => assert(row.get(1).toString.contains("10000")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match { case Some(row) => assert(row.get(1).toString.contains("name,surname")) + case None => assert(false) } FileUtils.deleteDirectory(new File(writerPath)) } @@ -2411,12 +2420,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val descLoc = sql("describe formatted sdkTable").collect descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match { case Some(row) => assert(row.get(1).toString.contains("true")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match { case Some(row) => assert(row.get(1).toString.contains("10000")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match { case Some(row) => assert(row.get(1).toString.contains("name,surname")) + case None => assert(false) } checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(1))) http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala index 8a27d0d..7593f38 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -219,12 +219,15 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo val descLoc = sql("describe formatted localComplex").collect descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match { case Some(row) => assert(row.get(1).toString.contains("true")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match { case Some(row) => assert(row.get(1).toString.contains("10000")) + case None => assert(false) } descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match { case Some(row) => assert(row.get(1).toString.contains("name,val1.val2.street,val1.val2.city,val1.val2.WindSpeed,val1.val2.year")) + case None => assert(false) } // TODO: Add a validation http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index f1fa9b3..0127d7e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -50,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand( throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } + if (SegmentStatusManager.isCompactionInProgress(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "compaction", "data delete") + } + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data delete") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 964407f..4e9c1af 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -70,6 +70,9 @@ private[sql] case class CarbonProjectForUpdateCommand( if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } + if (SegmentStatusManager.isCompactionInProgress(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "compaction", "data update") + } if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data update") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 9e1e0e4..41dfea5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -123,41 +123,44 @@ private[sql] case class CarbonDescribeFormattedCommand( } var isLocalDictEnabled = tblProps.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, - CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) - val localDictEnabled = isLocalDictEnabled.split(",") { 0 } - results ++= Seq(("Local Dictionary Enabled", localDictEnabled, "")) - // if local dictionary is enabled, then only show other properties of local dictionary - if (localDictEnabled.toBoolean) { - var localDictThreshold = tblProps.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, - CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT) - val localDictionaryThreshold = localDictThreshold.split(",") - localDictThreshold = localDictionaryThreshold { 0 } - results ++= Seq(("Local Dictionary Threshold", localDictThreshold, "")) - val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala - val builder = new StringBuilder - columns.foreach { column => - if (column.isLocalDictColumn && !column.isInvisible) { - builder.append(column.getColumnName).append(",") - } - } - results ++= - Seq(("Local Dictionary Include", getDictColumnString(builder.toString().split(",")), "")) - if (tblProps.asScala - .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) { + .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) + if (isLocalDictEnabled.isDefined) { + val localDictEnabled = isLocalDictEnabled.get.split(",") { 0 } + results ++= Seq(("Local Dictionary Enabled", localDictEnabled, "")) + // if local dictionary is enabled, then only show other properties of local dictionary + if (localDictEnabled.toBoolean) { + var localDictThreshold = tblProps.asScala + .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, + CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT) + val localDictionaryThreshold = localDictThreshold.split(",") + localDictThreshold = localDictionaryThreshold { 0 } + results ++= Seq(("Local Dictionary Threshold", localDictThreshold, "")) val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala val builder = new StringBuilder columns.foreach { column => - if (!column.isLocalDictColumn && !column.isInvisible && - (column.getDataType.equals(DataTypes.STRING) || - column.getDataType.equals(DataTypes.VARCHAR))) { + if (column.isLocalDictColumn && !column.isInvisible) { builder.append(column.getColumnName).append(",") } } results ++= - Seq(("Local Dictionary Exclude", getDictColumnString(builder.toString().split(",")), "")) + Seq(("Local Dictionary Include", getDictColumnString(builder.toString().split(",")), "")) + if (tblProps.asScala + .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) { + val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + val builder = new StringBuilder + columns.foreach { column => + if (!column.isLocalDictColumn && !column.isInvisible && + (column.getDataType.equals(DataTypes.STRING) || + column.getDataType.equals(DataTypes.VARCHAR))) { + builder.append(column.getColumnName).append(",") + } + } + results ++= + Seq(("Local Dictionary Exclude", getDictColumnString(builder.toString().split(",")), "")) + } } + } else { + results ++= Seq(("Local Dictionary Enabled", "false", "")) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 96b191f..cab9de5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -346,7 +346,17 @@ object AlterTableUtil { // since thriftTable also holds comment as its property. propKeys.foreach { propKey => if (validateTableProperties(propKey)) { - tblPropertiesMap.remove(propKey.toLowerCase) + // This check is required because for old tables we need to keep same behavior for it, + // meaning, local dictionary should be disabled. To enable we can use set command for + // older tables. So no need to remove from table properties map for unset just to ensure + // for older table behavior. So in case of unset, if enable property is already present + // in map, then just set it to default value of local dictionary which is true. + if (!propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) { + tblPropertiesMap.remove(propKey.toLowerCase) + } else { + tblPropertiesMap + .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) + } } else { val errorMessage = "Error: Invalid option(s): " + propKey throw new MalformedCarbonCommandException(errorMessage)