This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new f2db78a [CARBONDATA-3455] Job Group ID is not displayed for the IndexServer Jobs f2db78a is described below commit f2db78ad5398a4575cf29fcf4ee02203ff558df0 Author: dhatchayani <dhatcha.offic...@gmail.com> AuthorDate: Thu Jun 27 15:59:13 2019 +0530 [CARBONDATA-3455] Job Group ID is not displayed for the IndexServer Jobs Job Group ID is not displayed for the IndexServer Jobs as it is not set. This closes #3309 --- .../org/apache/spark/sql/util/SparkSQLUtil.scala | 29 ++++++++++++++++++++++ .../carbondata/indexserver/DataMapJobs.scala | 12 ++------- .../carbondata/indexserver/IndexServer.scala | 5 ++-- .../spark/rdd/CarbonDataRDDFactory.scala | 4 ++- .../sql/execution/command/cache/CacheUtil.scala | 9 +++++-- .../command/cache/CarbonDropCacheCommand.scala | 2 +- .../command/cache/CarbonShowCacheCommand.scala | 2 +- .../mutation/CarbonProjectForDeleteCommand.scala | 2 +- .../mutation/CarbonProjectForUpdateCommand.scala | 3 ++- .../command/mutation/DeleteExecution.scala | 4 +-- 10 files changed, 51 insertions(+), 21 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index ec1014a..8f39f9b 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -220,4 +220,33 @@ object SparkSQLUtil { def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = { new SerializableConfiguration(hadoopConf) } + + /** + * Get the task group id + * + * @param sparkSession + * @return + */ + def getTaskGroupId(sparkSession: SparkSession): String = { + val taskGroupId = sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id") match { + case null => "" + case _ => sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id") + } + taskGroupId + } + + /** + * Get the task group description + * + * @param sparkSession + * @return + */ + def getTaskGroupDesc(sparkSession: SparkSession): String = { + val taskGroupDesc = sparkSession.sparkContext.getLocalProperty("spark.job.description") match { + case null => "" + case _ => sparkSession.sparkContext.getLocalProperty("spark.job.description") + } + taskGroupDesc + } + } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 75dade4..01b8824 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -57,16 +57,8 @@ class DistributedDataMapJob extends AbstractDataMapJob { val (resonse, time) = logTime { try { val spark = SparkSQLUtil.getSparkSession - val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match { - case null => "" - case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id") - } - val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match { - case null => "" - case _ => spark.sparkContext.getLocalProperty("spark.job.description") - } - dataMapFormat.setTaskGroupId(taskGroupId) - dataMapFormat.setTaskGroupDesc(taskGroupDesc) + dataMapFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark)) + dataMapFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark)) var filterInf = dataMapFormat.getFilterResolverIntf val filterProcessor = new FilterExpressionProcessor filterInf = removeSparkUnknown(filterInf, diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index 0a49bb9..f43f893 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -57,7 +57,7 @@ trait ServerInterface { * Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete. */ def invalidateSegmentCache(carbonTable: CarbonTable, - segmentIds: Array[String]): Unit + segmentIds: Array[String], jobGroupId: String = ""): Unit } /** @@ -126,11 +126,12 @@ object IndexServer extends ServerInterface { } override def invalidateSegmentCache(carbonTable: CarbonTable, - segmentIds: Array[String]): Unit = doAs { + segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs { val databaseName = carbonTable.getDatabaseName val tableName = carbonTable.getTableName val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup) + sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId) new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList) .collect() if (segmentIds.nonEmpty) { diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 447d05b..2e5f0ea 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -259,7 +259,9 @@ object CarbonDataRDDFactory { carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) { try { IndexServer.getClient.invalidateSegmentCache(carbonLoadModel - .getCarbonDataLoadSchema.getCarbonTable, compactedSegments.asScala.toArray) + .getCarbonDataLoadSchema.getCarbonTable, + compactedSegments.asScala.toArray, + SparkSQLUtil.getTaskGroupId(sqlContext.sparkSession)) } catch { case ex: Exception => LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala index c257699..8975027 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.command.cache import scala.collection.JavaConverters._ import org.apache.log4j.Logger +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.CacheType @@ -45,7 +47,7 @@ object CacheUtil { * @param carbonTable * @return List of all index files */ - def getAllIndexFiles(carbonTable: CarbonTable): List[String] = { + def getAllIndexFiles(carbonTable: CarbonTable)(sparkSession: SparkSession): List[String] = { if (carbonTable.isTransactionalTable) { val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier) @@ -56,7 +58,10 @@ object CacheUtil { val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala .map(_.getSegmentNo).toArray try { - IndexServer.getClient.invalidateSegmentCache(carbonTable, invalidSegmentIds) + IndexServer.getClient + .invalidateSegmentCache(carbonTable, + invalidSegmentIds, + SparkSQLUtil.getTaskGroupId(sparkSession)) } catch { case e: Exception => LOGGER.warn("Failed to clear cache from executors. ", e) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala index 909c196..1554f6a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala @@ -55,7 +55,7 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall carbonTable.getTableName)) { DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME) } else { - val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable) + val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)(sparkSession) // Extract dictionary keys for the table and create cache keys from those val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala index b766cec..45e811a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala @@ -82,7 +82,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) Checker .validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) - val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)(sparkSession).size val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 3521c97..ae1d848 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -116,7 +116,7 @@ private[sql] case class CarbonProjectForDeleteCommand( HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable, isUpdateOperation = false) - DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments) + DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments)(sparkSession) if (executorErrors.failureCauses != FailureCauses.NONE) { throw new Exception(executorErrors.errorMsg) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 20a8435..b620e38 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -158,7 +158,8 @@ private[sql] case class CarbonProjectForUpdateCommand( executionErrors, segmentsToBeDeleted) - DeleteExecution.clearDistributedSegmentCache(carbonTable, segmentsToBeDeleted) + DeleteExecution + .clearDistributedSegmentCache(carbonTable, segmentsToBeDeleted)(sparkSession) } else { throw new ConcurrentOperationException(carbonTable, "compaction", "update") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 3de8dd4..f9428a2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -317,13 +317,13 @@ object DeleteExecution { } def clearDistributedSegmentCache(carbonTable: CarbonTable, - segmentsToBeCleared: Seq[Segment]): Unit = { + segmentsToBeCleared: Seq[Segment])(sparkSession: SparkSession): Unit = { if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable .getDatabaseName, carbonTable.getTableName)) { try { IndexServer.getClient .invalidateSegmentCache(carbonTable, segmentsToBeCleared.map(_.getSegmentNo) - .toArray) + .toArray, SparkSQLUtil.getTaskGroupId(sparkSession)) } catch { case _: Exception => LOGGER.warn(s"Clearing of invalid segments for ${