[CARBONDATA-1763] Dropped table if exception thrown while creation Preaggregate table is not getting dropped when creation fails because
Exceptions from undo metadata is not handled If preaggregate table is not registered with main table(main table updation fails) then it is not dropped from metastore. This closes #1951 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1be27b08 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1be27b08 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1be27b08 Branch: refs/heads/carbonstore-rebase Commit: 1be27b085696937505acfbf62079c5ca31ad347c Parents: 957a51f Author: kunal642 <kunalkapoor...@gmail.com> Authored: Thu Feb 8 11:50:23 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Feb 9 16:22:03 2018 +0530 ---------------------------------------------------------------------- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../datamap/CarbonCreateDataMapCommand.scala | 2 +- .../datamap/CarbonDropDataMapCommand.scala | 29 ++++++++++++++++---- .../CreatePreAggregateTableCommand.scala | 3 +- 4 files changed, 28 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5c43d58..8ed7623 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -546,7 +546,7 @@ object CarbonDataRDDFactory { LOGGER.error(ex, "Problem while committing data maps") false } - if (!done && !commitComplete) { + if (!done || !commitComplete) { CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index f2f001e..0fd5437 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -123,7 +123,7 @@ case class CarbonCreateDataMapCommand( override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - if (!tableIsExists) { + if (!tableIsExists && createPreAggregateTableCommands != null) { createPreAggregateTableCommands.undoMetadata(sparkSession, exception) } else { Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index bc55988..8ef394c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -47,7 +47,8 @@ case class CarbonDropDataMapCommand( dataMapName: String, ifExistsSet: Boolean, databaseNameOp: Option[String], - tableName: String) + tableName: String, + forceDrop: Boolean = false) extends AtomicRunnableCommand { var commandToRun: CarbonDropTableCommand = _ @@ -74,6 +75,10 @@ case class CarbonDropDataMapCommand( case ex: NoSuchTableException => throw ex } + // If datamap to be dropped in parent table then drop the datamap from metastore and remove + // entry from parent table. + // If force drop is true then remove the datamap from hivemetastore. No need to remove from + // parent as the first condition would have taken care of it. if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) @@ -85,7 +90,6 @@ case class CarbonDropDataMapCommand( ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext) - carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2) val schemaConverter = new ThriftWrapperSchemaConverterImpl PreAggregateUtil.updateSchemaInfo( @@ -111,13 +115,28 @@ case class CarbonDropDataMapCommand( } else if (!ifExistsSet) { throw new NoSuchDataMapException(dataMapName, tableName) } - } else if ((carbonTable.isDefined && - carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) { + } else if (forceDrop) { + val childCarbonTable: Option[CarbonTable] = try { + val childTableName = tableName + "_" + dataMapName + Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession)) + } catch { + case _: Exception => + None + } + if (childCarbonTable.isDefined) { + commandToRun = CarbonDropTableCommand( + ifExistsSet = true, + Some(childCarbonTable.get.getDatabaseName), + childCarbonTable.get.getTableName, + dropChildTable = true) + commandToRun.processMetadata(sparkSession) + } + } else if (carbonTable.isDefined && + carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) { if (!ifExistsSet) { throw new NoSuchDataMapException(dataMapName, tableName) } } - } catch { case e: NoSuchDataMapException => throw e http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 54f0390..46d885d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -162,7 +162,8 @@ case class CreatePreAggregateTableCommand( dataMapName, ifExistsSet = true, parentTableIdentifier.database, - parentTableIdentifier.table).run(sparkSession) + parentTableIdentifier.table, + forceDrop = true).run(sparkSession) Seq.empty }