[CARBONDATA-1261] Load data sql add 'header' option

This closes #1133


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0481340d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0481340d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0481340d

Branch: refs/heads/datamap
Commit: 0481340dbef487c84a924f51282b9b30895204c3
Parents: 31a6ec6
Author: QiangCai <david.c...@gmail.com>
Authored: Tue Jul 4 12:11:33 2017 +0800
Committer: jackylk <jacky.li...@huawei.com>
Committed: Tue Jul 11 16:24:58 2017 +0800

----------------------------------------------------------------------
 .../TestLoadDataWithFileHeaderException.scala   | 101 ++++++++++++++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../execution/command/carbonTableSchema.scala   |  31 +++++-
 .../execution/command/carbonTableSchema.scala   |  32 +++++-
 4 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
index dbde455..e36969b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -43,7 +43,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest 
with BeforeAndAfterA
     }
   }
 
-  test("test load data ddl provided  wrong file header exception") {
+  test("test load data ddl provided wrong file header exception") {
     try {
       sql(s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' 
into table t3
@@ -56,6 +56,105 @@ class TestLoadDataWithFileHeaderException extends QueryTest 
with BeforeAndAfterA
     }
   }
 
+  test("test load data with wrong header , but without fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='abc')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("'header' option should be either 'true' 
or 'false'"))
+    }
+  }
+
+  test("test load data with wrong header and fileheader") {
+    try {
+      sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' 
into table t3
+         options('header'='', 
'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("'header' option should be either 'true' 
or 'false'"))
+    }
+  }
+
+  test("test load data with header=false, but without fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' 
into table t3
+         options('header'='False')
+         """)
+  }
+
+  test("test load data with header=false and fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' 
into table t3
+         options('header'='false', 
'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+  }
+
+  test("test load data with header=false and wrong fileheader") {
+    try {
+      sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into 
table t3
+        options('header'='false', 
'fileheader'='ID1,date2,country,name,phonetype,serialname,salary')
+        """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("CSV header in DDL is not proper. Column 
names in schema and CSV header are not the same"))
+    }
+  }
+
+  test("test load data with header=true, but without fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+         options('header'='True')
+         """)
+  }
+
+  test("test load data with header=true and fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='true', 
'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("When 'header' option is true, 
'fileheader' option is not required."))
+    }
+  }
+
+  test("test load data with header=true and wrong fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='true', 
'fileheader'='ID1,date1,country,name,phonetype,serialname,salary')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("When 'header' option is true, 
'fileheader' option is not required."))
+    }
+  }
+
+  test("test load data without header and fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+         """)
+  }
+
+  test("test load data without header, but with fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' 
into table t3
+         
options('fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4dbdc8d..7bf9765 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -839,7 +839,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", 
"BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", 
"BAD_RECORD_PATH",
       "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", 
"BATCH_SORT_SIZE_INMB",
-      "GLOBAL_SORT_PARTITIONS"
+      "GLOBAL_SORT_PARTITIONS", "HEADER"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 70c8407..44d5efb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -419,7 +419,7 @@ case class LoadTable(
 
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
-      val fileHeader = options.getOrElse("fileheader", "")
+      var fileHeader = options.getOrElse("fileheader", "")
       val escapeChar = options.getOrElse("escapechar", "\\")
       val commentchar = options.getOrElse("commentchar", "#")
       val columnDict = options.getOrElse("columndict", null)
@@ -441,6 +441,35 @@ case class LoadTable(
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
       val globalSortPartitions = options.getOrElse("global_sort_partitions", 
null)
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
+
+      // if there isn't file header in csv file and load sql doesn't provide 
FILEHEADER option,
+      // we should use table schema to generate file header.
+      val headerOption = options.get("header")
+      if (headerOption.isDefined) {
+        // whether the csv file has file header
+        // the default value is true
+        val header = try {
+          headerOption.get.toBoolean
+        } catch {
+          case ex: IllegalArgumentException =>
+            throw new MalformedCarbonCommandException(
+              "'header' option should be either 'true' or 'false'. " + 
ex.getMessage)
+        }
+        header match {
+          case true =>
+            if (fileHeader.nonEmpty) {
+              throw new MalformedCarbonCommandException(
+                "When 'header' option is true, 'fileheader' option is not 
required.")
+            }
+          case false =>
+            // generate file header
+            if (fileHeader.isEmpty) {
+              fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+                .asScala.map(_.getColName).mkString(",")
+            }
+        }
+      }
+
       val bad_record_path = options.getOrElse("bad_record_path",
           
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
             CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 2e5812c..4b22cea 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -537,6 +537,36 @@ case class LoadTable(
       
carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get,
 "\\"))
       
carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get,
 "\""))
       
carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get,
 "#"))
+
+      // if there isn't file header in csv file and load sql doesn't provide 
FILEHEADER option,
+      // we should use table schema to generate file header.
+      var fileHeader = optionsFinal.get("fileheader").get
+      val headerOption = options.get("header")
+      if (headerOption.isDefined) {
+        // whether the csv file has file header
+        // the default value is true
+        val header = try {
+          headerOption.get.toBoolean
+        } catch {
+          case ex: IllegalArgumentException =>
+            throw new MalformedCarbonCommandException(
+              "'header' option should be either 'true' or 'false'. " + 
ex.getMessage)
+        }
+        header match {
+          case true =>
+            if (fileHeader.nonEmpty) {
+              throw new MalformedCarbonCommandException(
+                "When 'header' option is true, 'fileheader' option is not 
required.")
+            }
+          case false =>
+            // generate file header
+            if (fileHeader.isEmpty) {
+              fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+                .asScala.map(_.getColName).mkString(",")
+            }
+        }
+      }
+
       carbonLoadModel.setDateFormat(dateFormat)
       carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -582,7 +612,7 @@ case class LoadTable(
         LOGGER.info(s"Initiating Direct Load for the Table : 
($dbName.$tableName)")
         carbonLoadModel.setFactFilePath(factPath)
         carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-        carbonLoadModel.setCsvHeader(optionsFinal.get("fileheader").get)
+        carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(column_dict)
         carbonLoadModel.setDirectLoad(true)
         
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))

Reply via email to