This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8f0ec97 [CARBONDATA-3458] Setting Spark Execution Id to null only for Spark version 2.2 and below. 8f0ec97 is described below commit 8f0ec970ee0c74007201fd03c750458cb4eba57b Author: shivamasn <shivamas...@gmail.com> AuthorDate: Tue Jul 2 15:45:54 2019 +0530 [CARBONDATA-3458] Setting Spark Execution Id to null only for Spark version 2.2 and below. Problem: Spark Execution_ID should not be set to null in Spark 2.3. Solution: Set EXECUTION_ID to null in spark version 2.2 and below. In 2.3 version, EXECUTION_ID is to be set by spark code. This closes #3313 --- .../src/main/scala/org/apache/spark/util/SparkUtil.scala | 10 ++++++++++ .../execution/command/management/CarbonLoadDataCommand.scala | 7 +++---- .../sql/execution/command/table/CarbonCreateTableCommand.scala | 4 ++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala index 0b3d35b..ba782e5 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/util/SparkUtil.scala @@ -18,6 +18,8 @@ package org.apache.spark.util import org.apache.spark.{SPARK_VERSION, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY /* * this object use to handle file splits @@ -57,4 +59,12 @@ object SparkUtil { isSparkVersionXandAbove(xVersion, true) } + def setNullExecutionId(sparkSession: SparkSession): Unit = { + // "spark.sql.execution.id is already set" exception will be + // thrown if not set to null in spark2.2 and below versions + if (!SparkUtil.isSparkVersionXandAbove("2.3")) { + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + } + } + } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 390de33..6a03eab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.execution.LogicalRDD -import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} import org.apache.spark.sql.hive.CarbonRelation @@ -46,7 +45,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} +import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil} import org.apache.carbondata.common.Strings import org.apache.carbondata.common.logging.LogServiceFactory @@ -837,7 +836,7 @@ case class CarbonLoadDataCommand( query = logicalPlan, overwrite = false, ifPartitionNotExists = false) - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + SparkUtil.setNullExecutionId(sparkSession) Dataset.ofRows(sparkSession, convertedPlan) } catch { case ex: Throwable => @@ -1056,7 +1055,7 @@ case class CarbonLoadDataCommand( catalogTable: CatalogTable, df: DataFrame, carbonLoadModel: CarbonLoadModel): LogicalPlan = { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + SparkUtil.setNullExecutionId(sparkSession) // In case of update, we don't need the segmrntid column in case of partitioning val dropAttributes = df.logicalPlan.output.dropRight(1) val finalOutput = catalogTable.schema.map { attr => diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index b5aa8f9..048a1f6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.util.SparkUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -122,7 +122,7 @@ case class CarbonCreateTableCommand( val tablePath = tableIdentifier.getTablePath val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath) val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation) - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + SparkUtil.setNullExecutionId(sparkSession) val partitionInfo = tableInfo.getFactTable.getPartitionInfo val partitionString = if (partitionInfo != null &&