This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0ec97  [CARBONDATA-3458] Setting Spark Execution Id to null only for 
Spark version 2.2 and below.
8f0ec97 is described below

commit 8f0ec970ee0c74007201fd03c750458cb4eba57b
Author: shivamasn <shivamas...@gmail.com>
AuthorDate: Tue Jul 2 15:45:54 2019 +0530

    [CARBONDATA-3458] Setting Spark Execution Id to null only for Spark version 
2.2 and below.
    
    Problem: Spark Execution_ID should not be set to null in Spark 2.3.
    
    Solution: Set EXECUTION_ID to null in spark version 2.2 and below.
    In 2.3 version, EXECUTION_ID is to be set by spark code.
    
    This closes #3313
---
 .../src/main/scala/org/apache/spark/util/SparkUtil.scala       | 10 ++++++++++
 .../execution/command/management/CarbonLoadDataCommand.scala   |  7 +++----
 .../sql/execution/command/table/CarbonCreateTableCommand.scala |  4 ++--
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
index 0b3d35b..ba782e5 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.util
 
 import org.apache.spark.{SPARK_VERSION, TaskContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 
 /*
  * this object use to handle file splits
@@ -57,4 +59,12 @@ object SparkUtil {
     isSparkVersionXandAbove(xVersion, true)
   }
 
+  def setNullExecutionId(sparkSession: SparkSession): Unit = {
+    // "spark.sql.execution.id is already set" exception will be
+    // thrown if not set to null in spark2.2 and below versions
+    if (!SparkUtil.isSparkVersionXandAbove("2.3")) {
+      sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+    }
+  }
+
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 390de33..6a03eab 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.hive.CarbonRelation
@@ -46,7 +45,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, 
SparkUtil}
 
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -837,7 +836,7 @@ case class CarbonLoadDataCommand(
           query = logicalPlan,
           overwrite = false,
           ifPartitionNotExists = false)
-      sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+      SparkUtil.setNullExecutionId(sparkSession)
       Dataset.ofRows(sparkSession, convertedPlan)
     } catch {
       case ex: Throwable =>
@@ -1056,7 +1055,7 @@ case class CarbonLoadDataCommand(
       catalogTable: CatalogTable,
       df: DataFrame,
       carbonLoadModel: CarbonLoadModel): LogicalPlan = {
-    sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+    SparkUtil.setNullExecutionId(sparkSession)
     // In case of update, we don't need the segmrntid column in case of 
partitioning
     val dropAttributes = df.logicalPlan.output.dropRight(1)
     val finalOutput = catalogTable.schema.map { attr =>
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index b5aa8f9..048a1f6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -122,7 +122,7 @@ case class CarbonCreateTableCommand(
           val tablePath = tableIdentifier.getTablePath
           val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, 
tablePath)
           val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
-          sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+          SparkUtil.setNullExecutionId(sparkSession)
           val partitionInfo = tableInfo.getFactTable.getPartitionInfo
           val partitionString =
             if (partitionInfo != null &&

Reply via email to