Repository: carbondata Updated Branches: refs/heads/master c58eb43ba -> 94973c587
[CARBONDATA-2358] Fix overwrite mode in dataframe write data DataFrame overwrite is not working properly if the table is already created.Solution : Delete segments should not be used while overwriting dataframe , it should be atomic operation so overwrite option should be passed to load command internally This closes #2186 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94973c58 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94973c58 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94973c58 Branch: refs/heads/master Commit: 94973c58751f03c9f8c12a9851248733aa26629b Parents: c58eb43 Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu Apr 19 10:42:46 2018 +0530 Committer: chenliang613 <chenliang...@huawei.com> Committed: Sat Apr 21 12:06:19 2018 +0800 ---------------------------------------------------------------------- .../testsuite/dataload/TestLoadDataFrame.scala | 58 +++++++++++++++++++- .../apache/carbondata/spark/CarbonOption.scala | 3 + .../spark/sql/CarbonDataFrameWriter.scala | 2 +- .../org/apache/spark/sql/CarbonSource.scala | 57 +++++++++++++------ 4 files changed, 102 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index a214444..f413b12 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -22,7 +22,8 @@ import java.math.BigDecimal import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, Row, SaveMode} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.scalatest.BeforeAndAfterAll class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { @@ -83,6 +84,8 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified") sql("DROP TABLE IF EXISTS df_write_specify_sort_column") sql("DROP TABLE IF EXISTS df_write_empty_sort_column") + sql("DROP TABLE IF EXISTS carbon_table_df") + sql("DROP TABLE IF EXISTS carbon_table_df1") } @@ -117,6 +120,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("select count(*) from carbon1 where c3 > 500"), Row(31500) ) + sql(s"describe formatted carbon1").show(true) } test("test load dataframe with saving csv uncompressed files") { @@ -335,6 +339,58 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { assert(sortColumnValue.isEmpty) } + test("test load dataframe while giving already created table") { + + sql(s"create table carbon_table_df(c1 string, c2 string, c3 int) stored by 'carbondata'") + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon_table_df") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + + df.write + .format("carbondata") + .option("tableName", "carbon_table_df") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon_table_df where c3 > 500"), Row(31500) + ) + } + + test("test load dataframe while giving already created table with delete segment") { + + sql(s"create table carbon_table_df1(c1 string, c2 string, c3 int) stored by 'carbondata'") + val table = CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession) + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon_table_df1") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + + assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession) + .getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable)) + + sql("delete from table carbon_table_df1 where segment.id in (0)") + df.write + .format("carbondata") + .option("tableName", "carbon_table_df1") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession) + .getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable)) + checkAnswer( + sql("select count(*) from carbon_table_df1 where c3 > 500"), Row(31500) + ) + + } + override def afterAll { dropTable } http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index dfd9567..e854bbe 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -60,5 +60,8 @@ class CarbonOption(options: Map[String, String]) { def isStreaming: Boolean = options.getOrElse("streaming", "false").toBoolean + def overwriteEnabled: Boolean = + options.getOrElse("overwrite", "false").toBoolean + def toMap: Map[String, String] = options } http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 2be89b1..67817c0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -56,7 +56,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { null, Seq(), Map("fileheader" -> header) ++ options.toMap, - isOverwriteTable = false, + isOverwriteTable = options.overwriteEnabled, null, Some(dataFrame)).run(sqlContext.sparkSession) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 4ea1ac9..c8d9fe4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql +import java.util.Locale + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions import org.apache.commons.lang.StringUtils -import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor} @@ -85,7 +86,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider parameters: Map[String, String], data: DataFrame): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) - val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) + var newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) // User should not specify path since only one store is supported in carbon currently, // after we support multi-store, we can remove this limitation require(!newParameters.contains("path"), "'path' should not be specified, " + @@ -93,23 +94,16 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider "specified when creating CarbonContext") val options = new CarbonOption(newParameters) - val tablePath = new Path( - CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)) - val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - .exists(tablePath) + val isExists = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.tableExists( + options.tableName, options.dbName)(sqlContext.sparkSession) val (doSave, doAppend) = (mode, isExists) match { case (SaveMode.ErrorIfExists, true) => CarbonException.analysisException(s"table path already exists.") case (SaveMode.Overwrite, true) => - val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession) - // In order to overwrite, delete all segments in the table - sqlContext.sparkSession.sql( - s""" - | DELETE FROM TABLE $dbName.${options.tableName} - | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00' - """.stripMargin) + newParameters += (("overwrite", "true")) (true, false) case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) => + newParameters += (("overwrite", "true")) (true, false) case (SaveMode.Append, _) => (false, true) @@ -119,9 +113,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider if (doSave) { // save data when the save mode is Overwrite. - new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(newParameters) + new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile( + CaseInsensitiveMap[String](newParameters)) } else if (doAppend) { - new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(newParameters) + new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile( + CaseInsensitiveMap[String](newParameters)) } createRelation(sqlContext, newParameters, data.schema) @@ -134,7 +130,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider dataSchema: StructType): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) addLateDecodeOptimization(sqlContext.sparkSession) - val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) + val newParameters = + CaseInsensitiveMap[String](CarbonScalaUtil.getDeserializedParameters(parameters)) val dbName: String = CarbonEnv.getDatabaseName(newParameters.get("dbName"))(sqlContext.sparkSession) val tableOption: Option[String] = newParameters.get("tableName") @@ -365,3 +362,31 @@ object CarbonSource { map.asScala.toMap } } + +/** + * Code ported from Apache Spark + * Builds a map in which keys are case insensitive. Input map can be accessed for cases where + * case-sensitive information is required. The primary constructor is marked private to avoid + * nested case-insensitive map creation, otherwise the keys in the original map will become + * case-insensitive in this scenario. + */ +case class CaseInsensitiveMap[T] (originalMap: Map[String, T]) extends Map[String, T] + with Serializable { + + val keyLowerCasedMap = originalMap.map(kv => kv.copy(_1 = kv._1.toLowerCase(Locale.ROOT))) + + override def get(k: String): Option[T] = keyLowerCasedMap.get(k.toLowerCase(Locale.ROOT)) + + override def contains(k: String): Boolean = + keyLowerCasedMap.contains(k.toLowerCase(Locale.ROOT)) + + override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = { + new CaseInsensitiveMap(originalMap + kv) + } + + override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator + + override def -(key: String): Map[String, T] = { + new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key))) + } +}