Kejian-Li commented on a change in pull request #4004:
URL: https://github.com/apache/carbondata/pull/4004#discussion_r530818500



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
##########
@@ -17,20 +17,213 @@
 
 package org.apache.spark.sql.execution.command.mutation
 
-import org.apache.spark.sql._
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
+import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.hive.HiveSessionCatalog
+import org.apache.spark.storage.StorageLevel
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.view.{MVSchema, MVStatus}
+import org.apache.carbondata.events.{Event, OperationContext, 
OperationListenerBus, UpdateTablePostEvent}
+import org.apache.carbondata.view.MVManagerInSpark
+
 
 /**
  * Util for IUD common function
  */
 object IUDCommonUtil {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def tryHorizontalCompaction(sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      updatedSegmentList: Set[String]): Unit = {
+    var hasCompactionException = false
+    var compactTimestamp = ""
+    try {
+      HorizontalCompaction.tryHorizontalCompaction(
+        sparkSession, carbonTable, updatedSegmentList)
+    } catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error(
+          "Update operation passed. Exception in Horizontal Compaction. Please 
check logs." + e)
+        // In case of failure , clean all related delta files
+        compactTimestamp = e.compactionTimeStamp.toString
+        hasCompactionException = true
+    } finally {
+      if (hasCompactionException) {
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, compactTimestamp)
+      }
+    }
+  }
+
+  def refreshMVandIndex(sparkSession: SparkSession,
+      carbonTable: CarbonTable, operationContext: OperationContext, event: 
Event): Unit = {
+    if (CarbonProperties.getInstance().isMVEnabled) {
+      var hasMaintainMVException = false
+      val viewManager = MVManagerInSpark.get(sparkSession)
+      var viewSchemas: util.List[MVSchema] = new util.ArrayList
+      try {
+        // Truncate materialized views on the current table.
+        viewSchemas = viewManager.getSchemasOnTable(carbonTable)
+        if (!viewSchemas.isEmpty) {
+          viewManager.onTruncate(viewSchemas)
+        }
+        // Load materialized views on the current table.
+        OperationListenerBus.getInstance.fireEvent(event, operationContext)
+      } catch {
+        case e: Exception =>
+          hasMaintainMVException = true
+          LOGGER.error("Maintain MV in Update operation failed. Please check 
logs." + e)
+      } finally {
+        if (hasMaintainMVException) {
+          viewManager.setStatus(viewSchemas, MVStatus.DISABLED)
+        }
+      }
+    }
+  }
+
+
+  def coalesceDataSetIfNeeded(dataset: Dataset[Row],
+      nonEmptyPartitionCount: Long,
+      isPersistEnabled: Boolean): Dataset[Row] = {
+    val ratioOfNonEmptyPartition: Float = nonEmptyPartitionCount / 
dataset.rdd.getNumPartitions
+    var coalescedDataSet: Dataset[Row] = dataset
+    if (ratioOfNonEmptyPartition < 0.5f) {
+      coalescedDataSet = dataset.coalesce(nonEmptyPartitionCount.toInt)
+    }
+    if (isPersistEnabled) {
+      coalescedDataSet = coalescedDataSet.persist(
+        StorageLevel.fromString(CarbonProperties.getInstance()
+          .getUpdateDatasetStorageLevel()))
+    }
+    coalescedDataSet
+  }
+
+  def countNonEmptyPartitions(sparkSession: SparkSession, dataset: 
Dataset[Row],
+      carbonTable: CarbonTable, uuid: String): Long = {
+    val metricName = "nonEmptyPart"
+    val accumulatorName = getAccumulatorName(carbonTable, uuid, metricName)
+    val nonEmptyPart = 
sparkSession.sparkContext.longAccumulator(accumulatorName)
+    dataset.foreachPartition(partition =>
+      if (!partition.isEmpty) {
+        nonEmptyPart.add(1)
+      }
+    )
+    nonEmptyPart.value
+  }
+
+  def getAccumulatorName(carbonTable: CarbonTable, uuid: String, metricName: 
String): String = {
+    s"${carbonTable.getTableId}_${uuid}_${metricName}"
+  }
+
+  def uniqueValueCheck(dataset: Dataset[Row]): Unit = {
+    val ds = 
dataset.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+      .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+      .count()
+      .select("count")
+      .filter(col("count") > lit(1))
+      .limit(1)
+      .collect()
+    // tupleId represents the source rows that are going to get replaced.
+    // If same tupleId appeared more than once means key has more than one 
value to replace.
+    // which is undefined behavior.
+    if (ds.length > 0 && ds(0).getLong(0) > 1) {
+      throw new UnsupportedOperationException(
+        " update cannot be supported for 1 to N mapping, as more than one 
value present " +
+          "for the update key")
+    }
+  }
+
+  def checkPreconditionsForDelete(sparkSession: SparkSession,
+      logicalPlan: LogicalPlan,
+      carbonTable: CarbonTable): Unit = {
+    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan)
+    IUDCommonUtil.checkIsTranstionTable(carbonTable)
+    IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable)
+    IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.DELETE)
+    IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable)
+  }
+
+  def checkPreconditionsForUpdate(sparkSession: SparkSession,
+      logicalPlan: LogicalPlan,
+      carbonTable: CarbonTable,
+      columns: List[String]): Unit = {
+    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan)
+    IUDCommonUtil.checkIsTranstionTable(carbonTable)
+    IUDCommonUtil.checkIfSpartialColumnsExists(carbonTable, columns)
+    IUDCommonUtil.checkIfColumnWithComplexTypeExists(carbonTable, columns)
+    IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable)
+    IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.UPDATE)
+    IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable)
+  }
+
+  def checkIsHeterogeneousSegmentTable(carbonTable: CarbonTable): Unit = {
+    if 
(MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+      throw new MalformedCarbonCommandException(
+        s"Unsupported operation on table containing mixed format segments")
+    }
+  }
+
+  def checkIsIndexedTable(carbonTable: CarbonTable, operation: 
TableOperation): Unit = {
+    if (!carbonTable.canAllow(carbonTable, operation)) {
+      throw new MalformedCarbonCommandException(
+        "update/delete operation is not supported for index")
+    }
+  }
+
+  def checkIsLoadInProgressInTable(carbonTable: CarbonTable): Unit = {
+    if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "loading", "data 
update")
+    }
+  }
+
+  def checkIsTranstionTable(carbonTable: CarbonTable): Unit = {
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+  }
+
+  def checkIfColumnWithComplexTypeExists(carbonTable: CarbonTable, columns: 
List[String]): Unit = {
+    columns.foreach { col =>
+      val dataType = 
carbonTable.getColumnByName(col).getColumnSchema.getDataType
+      if (dataType.isComplexType) {
+        throw new UnsupportedOperationException("Unsupported operation on 
Complex data type")
+      }
+    }
+  }
+
+  def checkIfSpartialColumnsExists(carbonTable: CarbonTable, columns: 
List[String]): Unit = {

Review comment:
       Spartial   =>   Spatial




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to