This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6c9bbfe8ec5606262bbbbb93b101986352578b2b
Author: Zhang Zhichao <441586...@qq.com>
AuthorDate: Tue Aug 13 11:00:23 2019 +0800

    [CARBONDATA-3491] Return updated/deleted rows count when execute 
update/delete sql
    
    Return updated/deleted rows count when execute update/delete sql
    
    This closes #3357
---
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  | 19 +++++++++++++
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 33 ++++++++++++++++++++++
 .../scala/org/apache/carbondata/spark/KeyVal.scala | 10 +++----
 .../apache/spark/util/CarbonReflectionUtils.scala  | 16 +++++++++++
 .../apache/spark/sql/CarbonCatalystOperators.scala |  6 ++--
 .../mutation/CarbonProjectForDeleteCommand.scala   | 21 ++++++++++----
 .../mutation/CarbonProjectForUpdateCommand.scala   | 19 ++++++++-----
 .../command/mutation/DeleteExecution.scala         | 27 ++++++++++--------
 .../spark/sql/hive/CarbonAnalysisRules.scala       | 12 +++++++-
 9 files changed, 129 insertions(+), 34 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index f26283b..4565d7a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -361,6 +361,25 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists decimal_table")
   }
 
+  test("[CARBONDATA-3491] Return updated/deleted rows count when execute 
update/delete sql") {
+    sql("drop table if exists test_return_row_count")
+
+    sql("create table test_return_row_count (a string, b string, c string) 
stored by 'carbondata'").show()
+    sql("insert into test_return_row_count select 'aaa','bbb','ccc'").show()
+    sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
+    sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+    sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+
+    checkAnswer(sql("delete from test_return_row_count where a = 'aaa'"),
+        Seq(Row(1))
+    )
+    checkAnswer(sql("select * from test_return_row_count"),
+        Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", 
"bbb", "ccc"))
+    )
+
+    sql("drop table if exists test_return_row_count").show()
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud_db cascade")
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index cf45600..ef18035 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -826,6 +826,39 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("""drop table iud.dest11""").show
   }
 
+  test("[CARBONDATA-3491] Return updated/deleted rows count when execute 
update/delete sql") {
+    sql("drop table if exists test_return_row_count")
+    sql("drop table if exists test_return_row_count_source")
+
+    sql("create table test_return_row_count (a string, b string, c string) 
stored by 'carbondata'").show()
+    sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
+    sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+    sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+
+    sql("create table test_return_row_count_source (a string, b string, c 
string) stored by 'carbondata'").show()
+    sql("insert into test_return_row_count_source select 
'aaa','eee','ccc'").show()
+    sql("insert into test_return_row_count_source select 
'bbb','bbb','ccc'").show()
+    sql("insert into test_return_row_count_source select 
'ccc','bbb','ccc'").show()
+    sql("insert into test_return_row_count_source select 
'ccc','bbb','ccc'").show()
+
+    checkAnswer(sql("update test_return_row_count set (b) = ('ddd') where a = 
'ccc'"),
+        Seq(Row(2))
+    )
+    checkAnswer(sql("select * from test_return_row_count"),
+        Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "ddd", "ccc"), Row("ccc", 
"ddd", "ccc"))
+    )
+
+    checkAnswer(sql("update test_return_row_count t set (t.b) = (select s.b 
from test_return_row_count_source s where s.a = 'aaa') where t.a = 'ccc'"),
+        Seq(Row(2))
+    )
+    checkAnswer(sql("select * from test_return_row_count"),
+        Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "eee", "ccc"), Row("ccc", 
"eee", "ccc"))
+    )
+
+    sql("drop table if exists test_return_row_count")
+    sql("drop table if exists test_return_row_count_source")
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index c4b1144..9fca245 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -76,22 +76,20 @@ class updateResultImpl
 }
 
 trait DeleteDelataResult[K, V] extends Serializable {
-  def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, 
ExecutionErrors)): (K, V)
+  def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, 
ExecutionErrors, Long)): (K, V)
 }
 
 class DeleteDelataResultImpl
-  extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors)] {
+  extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long)] {
   override def getKey(key: SegmentStatus,
-      value: (SegmentUpdateDetails, ExecutionErrors)): (SegmentStatus, 
(SegmentUpdateDetails,
-    ExecutionErrors)) = {
+      value: (SegmentUpdateDetails, ExecutionErrors, Long)): (SegmentStatus, 
(SegmentUpdateDetails,
+    ExecutionErrors, Long)) = {
     (key, value)
   }
 }
 
-
 trait PartitionResult[K, V] extends Serializable {
   def getKey(key: Int, value: Boolean): (K, V)
-
 }
 
 class PartitionResultImpl extends PartitionResult[Int, Boolean] {
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 4fc30d02..46692df 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
@@ -381,4 +382,19 @@ object CarbonReflectionUtils {
     nameField.setAccessible(true)
     nameField.set(caseObj, objToSet)
   }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+      plan: LogicalPlan): LogicalPlan = {
+    if (SparkUtil.isSparkVersionEqualTo("2.1") || 
SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = analyzer.getClass
+        .getMethod("execute", classOf[LogicalPlan])
+      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      val method: Method = analyzer.getClass
+        .getMethod("executeAndCheck", classOf[LogicalPlan])
+      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 160c785..5f745d2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -68,8 +68,8 @@ case class ExcludeProfile(attributes: Seq[Attribute]) extends 
CarbonProfile(attr
 case class ProjectForUpdate(
     table: UnresolvedRelation,
     columns: List[String],
-    children: Seq[LogicalPlan] ) extends LogicalPlan {
-  override def output: Seq[AttributeReference] = Seq.empty
+    children: Seq[LogicalPlan]) extends LogicalPlan {
+  override def output: Seq[Attribute] = Seq.empty
 }
 
 case class UpdateTable(
@@ -79,7 +79,7 @@ case class UpdateTable(
     alias: Option[String] = None,
     filer: String) extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = Seq.empty
+  override def output: Seq[Attribute] = Seq.empty
 }
 
 case class DeleteRecords(
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index ae1d848..45c73ac 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.command.mutation
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.types.LongType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -48,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
     timestamp: String)
   extends DataCommand {
 
+  override val output: Seq[Attribute] = {
+    Seq(AttributeReference("Deleted Row Count", LongType, nullable = false)())
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
@@ -104,7 +110,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      val deletedSegments = DeleteExecution.deleteDeltaExecution(
+      val (deletedSegments, deletedRowCount) = 
DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
@@ -112,16 +118,18 @@ private[sql] case class CarbonProjectForDeleteCommand(
         timestamp,
         isUpdateOperation = false,
         executorErrors)
+
+      // Check for any failures occured during delete delta execution
+      if (executorErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executorErrors.errorMsg)
+      }
+
       // call IUD Compaction.
       HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
         isUpdateOperation = false)
 
       DeleteExecution.clearDistributedSegmentCache(carbonTable, 
deletedSegments)(sparkSession)
 
-      if (executorErrors.failureCauses != FailureCauses.NONE) {
-        throw new Exception(executorErrors.errorMsg)
-      }
-
       val allDataMapSchemas = DataMapStoreManager.getInstance
         .getDataMapSchemasOfTable(carbonTable).asScala
         .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -134,11 +142,13 @@ private[sql] case class CarbonProjectForDeleteCommand(
       val deleteFromTablePostEvent: DeleteFromTablePostEvent =
         DeleteFromTablePostEvent(sparkSession, carbonTable)
       OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, 
operationContext)
+      Seq(Row(deletedRowCount))
     } catch {
       case e: HorizontalCompactionException =>
         LOGGER.error("Delete operation passed. Exception in Horizontal 
Compaction." +
                      " Please check logs. " + e.getMessage)
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, 
e.compactionTimeStamp.toString)
+        Seq(Row(0L))
 
       case e: Exception =>
         LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
@@ -160,7 +170,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
       updateLock.unlock()
       compactionLock.unlock()
     }
-    Seq.empty
   }
 
   override protected def opName: String = "DELETE DATA"
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index b620e38..686990d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -20,11 +20,12 @@ package org.apache.spark.sql.execution.command.mutation
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.ArrayType
+import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -50,8 +51,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
     columns: List[String])
   extends DataCommand {
 
+  override val output: Seq[Attribute] = {
+    Seq(AttributeReference("Updated Row Count", LongType, nullable = false)())
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    var updatedRowCount = 0L
     IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val res = plan find {
       case relation: LogicalRelation if relation.relation
@@ -61,7 +67,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
     }
 
     if (res.isEmpty) {
-      return Seq.empty
+      return Array(Row(updatedRowCount)).toSeq
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     if (carbonTable.getPartitionInfo != null &&
@@ -71,6 +77,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       throw new UnsupportedOperationException("Unsupported update operation 
for range/" +
         "hash/list partition table")
     }
+
     setAuditTable(carbonTable)
     setAuditInfo(Map("plan" -> plan.simpleString))
     columns.foreach { col =>
@@ -135,7 +142,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
           CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
           // do delete operation.
-          val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
+          val (segmentsToBeDeleted, updatedRowCountTemp) = 
DeleteExecution.deleteDeltaExecution(
             databaseNameOp,
             tableName,
             sparkSession,
@@ -148,6 +155,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
             throw new Exception(executionErrors.errorMsg)
           }
 
+          updatedRowCount = updatedRowCountTemp
           // do update operation.
           performUpdate(dataSet,
             databaseNameOp,
@@ -217,7 +225,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
         CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
       }
     }
-    Seq.empty
+    Seq(Row(updatedRowCount))
   }
 
   private def performUpdate(
@@ -304,9 +312,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
     executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
     executorErrors.failureCauses = 
updateTableModel.executorErrors.failureCauses
-
-    Seq.empty
-
   }
 
   override protected def opName: String = "UPDATE DATA"
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index f9428a2..cb86cb5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -64,14 +64,15 @@ object DeleteExecution {
       dataRdd: RDD[Row],
       timestamp: String,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors): Seq[Segment] = {
+      executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
 
-    var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors))]] = null
+    var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long))]] = null
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val tablePath = absoluteTableIdentifier.getTablePath
     var segmentsTobeDeleted = Seq.empty[Segment]
+    var operatedRowCount = 0L
 
     val deleteRdd = if (isUpdateOperation) {
       val schema =
@@ -97,7 +98,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (keyRdd.partitions.length == 0) {
-      return segmentsTobeDeleted
+      return (segmentsTobeDeleted, operatedRowCount)
     }
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
@@ -124,9 +125,9 @@ object DeleteExecution {
     val rdd = rowContRdd.join(keyRdd)
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, 
Iterable[Row]))]) =>
-        Iterator[List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors))]] {
+        Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))]] {
           
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
-          var result = List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors))]()
+          var result = List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long))]()
           while (records.hasNext) {
             val ((key), (rowCountDetailsVO, groupedRows)) = records.next
             val segmentId = key.substring(0, 
key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
@@ -143,7 +144,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (res.flatten.isEmpty) {
-      return segmentsTobeDeleted
+      return (segmentsTobeDeleted, operatedRowCount)
     }
 
     // update new status file
@@ -217,7 +218,7 @@ object DeleteExecution {
         timestamp: String,
         rowCountDetailsVO: RowCountDetailsVO,
         isStandardTable: Boolean
-    ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
+    ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] = {
 
       val result = new DeleteDelataResultImpl()
       var deleteStatus = SegmentStatus.LOAD_FAILURE
@@ -228,7 +229,8 @@ object DeleteExecution {
           
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
       val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
       val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new 
DeleteDeltaBlockDetails(blockName)
-      val resultIter = new Iterator[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors))] {
+      val resultIter =
+        new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] {
         val segmentUpdateDetails = new SegmentUpdateDetails()
         var TID = ""
         var countOfRows = 0
@@ -305,15 +307,18 @@ object DeleteExecution {
           }
         }
 
-        override def next(): (SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors)) = {
+        override def next(): (SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long)) = {
           finished = true
-          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
+          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors, 
countOfRows.toLong))
         }
       }
       resultIter
     }
 
-    segmentsTobeDeleted
+    if (executorErrors.failureCauses == FailureCauses.NONE) {
+       operatedRowCount = res.flatten.map(_._2._3).sum
+    }
+    (segmentsTobeDeleted, operatedRowCount)
   }
 
   def clearDistributedSegmentCache(carbonTable: CarbonTable,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 9ba2301..9b923b0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.util.CarbonUtil
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
 
   private lazy val parser = sparkSession.sessionState.sqlParser
+  private lazy val optimizer = sparkSession.sessionState.optimizer
+  private lazy val analyzer = sparkSession.sessionState.analyzer
 
   private def processUpdateQuery(
       table: UnresolvedRelation,
@@ -181,7 +183,15 @@ case class CarbonIUDAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logica
     }
     val destinationTable = 
CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias)
 
-    ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
+    // In Spark 2.1 and 2.2, it uses Analyzer.execute method to transform 
LogicalPlan
+    // but in Spark 2.3, it uses Analyzer.executeAndCheck method
+    val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(
+        analyzer, ProjectForUpdate(destinationTable, columns, Seq(finalPlan)))
+    // For all commands, they execute eagerly, and will be transformed to
+    // logical plan 'LocalRelation' in analyze phase(please see the code in 
'Dataset.logicalPlan'),
+    // so it needs to return logical plan 'CarbonProjectForUpdateCommand' here
+    // instead of 'ProjectForUpdate'
+    optimizer.execute(analyzedPlan)
   }
 
 

Reply via email to