This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f2db78a  [CARBONDATA-3455] Job Group ID is not displayed for the 
IndexServer Jobs
f2db78a is described below

commit f2db78ad5398a4575cf29fcf4ee02203ff558df0
Author: dhatchayani <dhatcha.offic...@gmail.com>
AuthorDate: Thu Jun 27 15:59:13 2019 +0530

    [CARBONDATA-3455] Job Group ID is not displayed for the IndexServer Jobs
    
    Job Group ID is not displayed for the IndexServer Jobs as it is not set.
    
    This closes #3309
---
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   | 29 ++++++++++++++++++++++
 .../carbondata/indexserver/DataMapJobs.scala       | 12 ++-------
 .../carbondata/indexserver/IndexServer.scala       |  5 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala           |  4 ++-
 .../sql/execution/command/cache/CacheUtil.scala    |  9 +++++--
 .../command/cache/CarbonDropCacheCommand.scala     |  2 +-
 .../command/cache/CarbonShowCacheCommand.scala     |  2 +-
 .../mutation/CarbonProjectForDeleteCommand.scala   |  2 +-
 .../mutation/CarbonProjectForUpdateCommand.scala   |  3 ++-
 .../command/mutation/DeleteExecution.scala         |  4 +--
 10 files changed, 51 insertions(+), 21 deletions(-)

diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index ec1014a..8f39f9b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -220,4 +220,33 @@ object SparkSQLUtil {
   def getSerializableConfigurableInstance(hadoopConf: Configuration): 
SerializableConfiguration = {
     new SerializableConfiguration(hadoopConf)
   }
+
+  /**
+   * Get the task group id
+   *
+   * @param sparkSession
+   * @return
+   */
+  def getTaskGroupId(sparkSession: SparkSession): String = {
+    val taskGroupId = 
sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+      case null => ""
+      case _ => sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id")
+    }
+    taskGroupId
+  }
+
+  /**
+   * Get the task group description
+   *
+   * @param sparkSession
+   * @return
+   */
+  def getTaskGroupDesc(sparkSession: SparkSession): String = {
+    val taskGroupDesc = 
sparkSession.sparkContext.getLocalProperty("spark.job.description") match {
+      case null => ""
+      case _ => 
sparkSession.sparkContext.getLocalProperty("spark.job.description")
+    }
+    taskGroupDesc
+  }
+
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 75dade4..01b8824 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -57,16 +57,8 @@ class DistributedDataMapJob extends AbstractDataMapJob {
     val (resonse, time) = logTime {
       try {
         val spark = SparkSQLUtil.getSparkSession
-        val taskGroupId = 
spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
-          case null => ""
-          case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
-        }
-        val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
-          case null => ""
-          case _ => 
spark.sparkContext.getLocalProperty("spark.job.description")
-        }
-        dataMapFormat.setTaskGroupId(taskGroupId)
-        dataMapFormat.setTaskGroupDesc(taskGroupDesc)
+        dataMapFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
+        dataMapFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark))
         var filterInf = dataMapFormat.getFilterResolverIntf
         val filterProcessor = new FilterExpressionProcessor
         filterInf = removeSparkUnknown(filterInf,
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 0a49bb9..f43f893 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -57,7 +57,7 @@ trait ServerInterface {
    * Invalidate the cache for the specified segments only. Used in case of 
compaction/Update/Delete.
    */
   def invalidateSegmentCache(carbonTable: CarbonTable,
-      segmentIds: Array[String]): Unit
+      segmentIds: Array[String], jobGroupId: String = ""): Unit
 }
 
 /**
@@ -126,11 +126,12 @@ object IndexServer extends ServerInterface {
   }
 
   override def invalidateSegmentCache(carbonTable: CarbonTable,
-      segmentIds: Array[String]): Unit = doAs {
+      segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs {
     val databaseName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     val jobgroup: String = " Invalided Segment Cache for " + databaseName + 
"." + tableName
     sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
+    sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
     new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
       .collect()
     if (segmentIds.nonEmpty) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 447d05b..2e5f0ea 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -259,7 +259,9 @@ object CarbonDataRDDFactory {
               carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
             try {
               IndexServer.getClient.invalidateSegmentCache(carbonLoadModel
-                .getCarbonDataLoadSchema.getCarbonTable, 
compactedSegments.asScala.toArray)
+                .getCarbonDataLoadSchema.getCarbonTable,
+                compactedSegments.asScala.toArray,
+                SparkSQLUtil.getTaskGroupId(sqlContext.sparkSession))
             } catch {
               case ex: Exception =>
                 LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index c257699..8975027 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.command.cache
 import scala.collection.JavaConverters._
 
 import org.apache.log4j.Logger
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheType
@@ -45,7 +47,7 @@ object CacheUtil {
    * @param carbonTable
    * @return List of all index files
    */
-  def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
+  def getAllIndexFiles(carbonTable: CarbonTable)(sparkSession: SparkSession): 
List[String] = {
     if (carbonTable.isTransactionalTable) {
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       val validAndInvalidSegmentsInfo = new 
SegmentStatusManager(absoluteTableIdentifier)
@@ -56,7 +58,10 @@ object CacheUtil {
         val invalidSegmentIds = 
validAndInvalidSegmentsInfo.getInvalidSegments.asScala
           .map(_.getSegmentNo).toArray
         try {
-          IndexServer.getClient.invalidateSegmentCache(carbonTable, 
invalidSegmentIds)
+          IndexServer.getClient
+            .invalidateSegmentCache(carbonTable,
+              invalidSegmentIds,
+              SparkSQLUtil.getTaskGroupId(sparkSession))
         } catch {
           case e: Exception =>
             LOGGER.warn("Failed to clear cache from executors. ", e)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index 909c196..1554f6a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -55,7 +55,7 @@ case class CarbonDropCacheCommand(tableIdentifier: 
TableIdentifier, internalCall
         carbonTable.getTableName)) {
         DataMapUtil.executeClearDataMapJob(carbonTable, 
DataMapUtil.DISTRIBUTED_JOB_NAME)
       } else {
-        val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
+        val allIndexFiles = 
CacheUtil.getAllIndexFiles(carbonTable)(sparkSession)
         // Extract dictionary keys for the table and create cache keys from 
those
         val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index b766cec..45e811a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -82,7 +82,7 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
       val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
       Checker
         .validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
-      val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+      val numberOfIndexFiles = 
CacheUtil.getAllIndexFiles(carbonTable)(sparkSession).size
       val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
       val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
       
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 3521c97..ae1d848 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -116,7 +116,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
         isUpdateOperation = false)
 
-      DeleteExecution.clearDistributedSegmentCache(carbonTable, 
deletedSegments)
+      DeleteExecution.clearDistributedSegmentCache(carbonTable, 
deletedSegments)(sparkSession)
 
       if (executorErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executorErrors.errorMsg)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 20a8435..b620e38 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -158,7 +158,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
             executionErrors,
             segmentsToBeDeleted)
 
-          DeleteExecution.clearDistributedSegmentCache(carbonTable, 
segmentsToBeDeleted)
+          DeleteExecution
+            .clearDistributedSegmentCache(carbonTable, 
segmentsToBeDeleted)(sparkSession)
 
         } else {
           throw new ConcurrentOperationException(carbonTable, "compaction", 
"update")
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 3de8dd4..f9428a2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -317,13 +317,13 @@ object DeleteExecution {
   }
 
   def clearDistributedSegmentCache(carbonTable: CarbonTable,
-      segmentsToBeCleared: Seq[Segment]): Unit = {
+      segmentsToBeCleared: Seq[Segment])(sparkSession: SparkSession): Unit = {
     if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
       .getDatabaseName, carbonTable.getTableName)) {
       try {
         IndexServer.getClient
           .invalidateSegmentCache(carbonTable, 
segmentsToBeCleared.map(_.getSegmentNo)
-          .toArray)
+          .toArray, SparkSQLUtil.getTaskGroupId(sparkSession))
       } catch {
         case _: Exception =>
           LOGGER.warn(s"Clearing of invalid segments for ${

Reply via email to