Repository: carbondata
Updated Branches:
  refs/heads/master c58eb43ba -> 94973c587


[CARBONDATA-2358] Fix overwrite mode in dataframe write data

DataFrame overwrite is not working properly if the table is already 
created.Solution : Delete segments should not be used while overwriting 
dataframe , it should be atomic operation so overwrite option should be passed 
to load command internally

This closes #2186


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94973c58
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94973c58
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94973c58

Branch: refs/heads/master
Commit: 94973c58751f03c9f8c12a9851248733aa26629b
Parents: c58eb43
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Thu Apr 19 10:42:46 2018 +0530
Committer: chenliang613 <chenliang...@huawei.com>
Committed: Sat Apr 21 12:06:19 2018 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala  | 58 +++++++++++++++++++-
 .../apache/carbondata/spark/CarbonOption.scala  |  3 +
 .../spark/sql/CarbonDataFrameWriter.scala       |  2 +-
 .../org/apache/spark/sql/CarbonSource.scala     | 57 +++++++++++++------
 4 files changed, 102 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index a214444..f413b12 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -22,7 +22,8 @@ import java.math.BigDecimal
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, 
Row, SaveMode}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.scalatest.BeforeAndAfterAll
 
 class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
@@ -83,6 +84,8 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
     sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
     sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
+    sql("DROP TABLE IF EXISTS carbon_table_df")
+    sql("DROP TABLE IF EXISTS carbon_table_df1")
   }
 
 
@@ -117,6 +120,7 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(
       sql("select count(*) from carbon1 where c3 > 500"), Row(31500)
     )
+    sql(s"describe formatted carbon1").show(true)
   }
 
   test("test load dataframe with saving csv uncompressed files") {
@@ -335,6 +339,58 @@ class TestLoadDataFrame extends QueryTest with 
BeforeAndAfterAll {
     assert(sortColumnValue.isEmpty)
   }
 
+  test("test load dataframe while giving already created table") {
+
+    sql(s"create table carbon_table_df(c1 string, c2 string, c3 int) stored by 
'carbondata'")
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon_table_df")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon_table_df")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon_table_df where c3 > 500"), Row(31500)
+    )
+  }
+
+  test("test load dataframe while giving already created table with delete 
segment") {
+
+    sql(s"create table carbon_table_df1(c1 string, c2 string, c3 int) stored 
by 'carbondata'")
+    val table = 
CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon_table_df1")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    
assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
+      .getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable))
+
+    sql("delete from table carbon_table_df1 where segment.id in (0)")
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon_table_df1")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+    
assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
+      .getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable))
+    checkAnswer(
+      sql("select count(*) from carbon_table_df1 where c3 > 500"), Row(31500)
+    )
+
+  }
+
   override def afterAll {
     dropTable
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index dfd9567..e854bbe 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -60,5 +60,8 @@ class CarbonOption(options: Map[String, String]) {
   def isStreaming: Boolean =
     options.getOrElse("streaming", "false").toBoolean
 
+  def overwriteEnabled: Boolean =
+    options.getOrElse("overwrite", "false").toBoolean
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 2be89b1..67817c0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -56,7 +56,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val 
dataFrame: DataFrame) {
       null,
       Seq(),
       Map("fileheader" -> header) ++ options.toMap,
-      isOverwriteTable = false,
+      isOverwriteTable = options.overwriteEnabled,
       null,
       Some(dataFrame)).run(sqlContext.sparkSession)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94973c58/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 4ea1ac9..c8d9fe4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql
 
+import java.util.Locale
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
@@ -85,7 +86,7 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
-    val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
+    var newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
     // User should not specify path since only one store is supported in 
carbon currently,
     // after we support multi-store, we can remove this limitation
     require(!newParameters.contains("path"), "'path' should not be specified, 
" +
@@ -93,23 +94,16 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
                                           "specified when creating 
CarbonContext")
 
     val options = new CarbonOption(newParameters)
-    val tablePath = new Path(
-      CarbonEnv.getTablePath(options.dbName, 
options.tableName)(sqlContext.sparkSession))
-    val isExists = 
tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-      .exists(tablePath)
+    val isExists = 
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.tableExists(
+      options.tableName, options.dbName)(sqlContext.sparkSession)
     val (doSave, doAppend) = (mode, isExists) match {
       case (SaveMode.ErrorIfExists, true) =>
         CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
-        val dbName = 
CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
-        // In order to overwrite, delete all segments in the table
-        sqlContext.sparkSession.sql(
-          s"""
-             | DELETE FROM TABLE $dbName.${options.tableName}
-             | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
-           """.stripMargin)
+        newParameters += (("overwrite", "true"))
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
+        newParameters += (("overwrite", "true"))
         (true, false)
       case (SaveMode.Append, _) =>
         (false, true)
@@ -119,9 +113,11 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
 
     if (doSave) {
       // save data when the save mode is Overwrite.
-      new CarbonDataFrameWriter(sqlContext, 
data).saveAsCarbonFile(newParameters)
+      new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(
+        CaseInsensitiveMap[String](newParameters))
     } else if (doAppend) {
-      new CarbonDataFrameWriter(sqlContext, 
data).appendToCarbonFile(newParameters)
+      new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(
+        CaseInsensitiveMap[String](newParameters))
     }
 
     createRelation(sqlContext, newParameters, data.schema)
@@ -134,7 +130,8 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
       dataSchema: StructType): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
-    val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
+    val newParameters =
+      
CaseInsensitiveMap[String](CarbonScalaUtil.getDeserializedParameters(parameters))
     val dbName: String =
       
CarbonEnv.getDatabaseName(newParameters.get("dbName"))(sqlContext.sparkSession)
     val tableOption: Option[String] = newParameters.get("tableName")
@@ -365,3 +362,31 @@ object CarbonSource {
     map.asScala.toMap
   }
 }
+
+/**
+ * Code ported from Apache Spark
+ * Builds a map in which keys are case insensitive. Input map can be accessed 
for cases where
+ * case-sensitive information is required. The primary constructor is marked 
private to avoid
+ * nested case-insensitive map creation, otherwise the keys in the original 
map will become
+ * case-insensitive in this scenario.
+ */
+case class CaseInsensitiveMap[T] (originalMap: Map[String, T]) extends 
Map[String, T]
+  with Serializable {
+
+  val keyLowerCasedMap = originalMap.map(kv => kv.copy(_1 = 
kv._1.toLowerCase(Locale.ROOT)))
+
+  override def get(k: String): Option[T] = 
keyLowerCasedMap.get(k.toLowerCase(Locale.ROOT))
+
+  override def contains(k: String): Boolean =
+    keyLowerCasedMap.contains(k.toLowerCase(Locale.ROOT))
+
+  override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = {
+    new CaseInsensitiveMap(originalMap + kv)
+  }
+
+  override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator
+
+  override def -(key: String): Map[String, T] = {
+    new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key)))
+  }
+}

Reply via email to