[CARBONDATA-1261] Load data sql add 'header' option This closes #1133
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0481340d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0481340d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0481340d Branch: refs/heads/datamap Commit: 0481340dbef487c84a924f51282b9b30895204c3 Parents: 31a6ec6 Author: QiangCai <david.c...@gmail.com> Authored: Tue Jul 4 12:11:33 2017 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Jul 11 16:24:58 2017 +0800 ---------------------------------------------------------------------- .../TestLoadDataWithFileHeaderException.scala | 101 ++++++++++++++++++- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 +- .../execution/command/carbonTableSchema.scala | 31 +++++- .../execution/command/carbonTableSchema.scala | 32 +++++- 4 files changed, 162 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala index dbde455..e36969b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala @@ -43,7 +43,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } } - test("test load data ddl provided wrong file header exception") { + test("test load data ddl provided wrong file header exception") { try { sql(s""" LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 @@ -56,6 +56,105 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } } + test("test load data with wrong header , but without fileheader") { + try { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 + options('header'='abc') + """) + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("'header' option should be either 'true' or 'false'")) + } + } + + test("test load data with wrong header and fileheader") { + try { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 + options('header'='', 'fileheader'='ID,date,country,name,phonetype,serialname,salary') + """) + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("'header' option should be either 'true' or 'false'")) + } + } + + test("test load data with header=false, but without fileheader") { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 + options('header'='False') + """) + } + + test("test load data with header=false and fileheader") { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 + options('header'='false', 'fileheader'='ID,date,country,name,phonetype,serialname,salary') + """) + } + + test("test load data with header=false and wrong fileheader") { + try { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 + options('header'='false', 'fileheader'='ID1,date2,country,name,phonetype,serialname,salary') + """) + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("CSV header in DDL is not proper. Column names in schema and CSV header are not the same")) + } + } + + test("test load data with header=true, but without fileheader") { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 + options('header'='True') + """) + } + + test("test load data with header=true and fileheader") { + try { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 + options('header'='true', 'fileheader'='ID,date,country,name,phonetype,serialname,salary') + """) + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("When 'header' option is true, 'fileheader' option is not required.")) + } + } + + test("test load data with header=true and wrong fileheader") { + try { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 + options('header'='true', 'fileheader'='ID1,date1,country,name,phonetype,serialname,salary') + """) + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("When 'header' option is true, 'fileheader' option is not required.")) + } + } + + test("test load data without header and fileheader") { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 + """) + } + + test("test load data without header, but with fileheader") { + sql(s""" + LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 + options('fileheader'='ID,date,country,name,phonetype,serialname,salary') + """) + } + override def afterAll { sql("DROP TABLE IF EXISTS t3") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 4dbdc8d..7bf9765 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -839,7 +839,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION", "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH", "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB", - "GLOBAL_SORT_PARTITIONS" + "GLOBAL_SORT_PARTITIONS", "HEADER" ) var isSupported = true val invalidOptions = StringBuilder.newBuilder http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 70c8407..44d5efb 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -419,7 +419,7 @@ case class LoadTable( val delimiter = options.getOrElse("delimiter", ",") val quoteChar = options.getOrElse("quotechar", "\"") - val fileHeader = options.getOrElse("fileheader", "") + var fileHeader = options.getOrElse("fileheader", "") val escapeChar = options.getOrElse("escapechar", "\\") val commentchar = options.getOrElse("commentchar", "#") val columnDict = options.getOrElse("columndict", null) @@ -441,6 +441,35 @@ case class LoadTable( val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null) val globalSortPartitions = options.getOrElse("global_sort_partitions", null) ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) + + // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, + // we should use table schema to generate file header. + val headerOption = options.get("header") + if (headerOption.isDefined) { + // whether the csv file has file header + // the default value is true + val header = try { + headerOption.get.toBoolean + } catch { + case ex: IllegalArgumentException => + throw new MalformedCarbonCommandException( + "'header' option should be either 'true' or 'false'. " + ex.getMessage) + } + header match { + case true => + if (fileHeader.nonEmpty) { + throw new MalformedCarbonCommandException( + "When 'header' option is true, 'fileheader' option is not required.") + } + case false => + // generate file header + if (fileHeader.isEmpty) { + fileHeader = table.getCreateOrderColumn(table.getFactTableName) + .asScala.map(_.getColName).mkString(",") + } + } + } + val bad_record_path = options.getOrElse("bad_record_path", CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0481340d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 2e5812c..4b22cea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -537,6 +537,36 @@ case class LoadTable( carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get, "#")) + + // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, + // we should use table schema to generate file header. + var fileHeader = optionsFinal.get("fileheader").get + val headerOption = options.get("header") + if (headerOption.isDefined) { + // whether the csv file has file header + // the default value is true + val header = try { + headerOption.get.toBoolean + } catch { + case ex: IllegalArgumentException => + throw new MalformedCarbonCommandException( + "'header' option should be either 'true' or 'false'. " + ex.getMessage) + } + header match { + case true => + if (fileHeader.nonEmpty) { + throw new MalformedCarbonCommandException( + "When 'header' option is true, 'fileheader' option is not required.") + } + case false => + // generate file header + if (fileHeader.isEmpty) { + fileHeader = table.getCreateOrderColumn(table.getFactTableName) + .asScala.map(_.getColName).mkString(",") + } + } + } + carbonLoadModel.setDateFormat(dateFormat) carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, @@ -582,7 +612,7 @@ case class LoadTable( LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") carbonLoadModel.setFactFilePath(factPath) carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) - carbonLoadModel.setCsvHeader(optionsFinal.get("fileheader").get) + carbonLoadModel.setCsvHeader(fileHeader) carbonLoadModel.setColDictFilePath(column_dict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))