[CARBONDATA-1964] Fixed bug to set bad.records.action parameter using SET command
Fixed bug to set bad.records.action parameter using SET command This closes #1819 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e3498201 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e3498201 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e3498201 Branch: refs/heads/fgdatamap Commit: e3498201e084167d3268eb545c2a5ee34269705f Parents: 8314ea2 Author: Geetika Gupta <geetika.gu...@knoldus.in> Authored: Wed Jan 17 13:31:56 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Wed Jan 31 11:21:14 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/util/SessionParams.java | 4 + .../sdv/generated/DataLoadingTestCase.scala | 2 - .../badrecordloger/BadRecordActionTest.scala | 99 ++++++++++++++++++++ .../carbondata/spark/util/DataLoadingUtil.scala | 2 +- 4 files changed, 104 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index afbd947..ddc7539 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.exception.InvalidConfigurationException; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION; @@ -104,6 +105,9 @@ public class SessionParams implements Serializable { throws InvalidConfigurationException { boolean isValidConf = validateKeyValue(key, value); if (isValidConf) { + if (key.equals(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)) { + value = value.toUpperCase(); + } if (doAuditing) { LOGGER.audit("The key " + key + " with value " + value + " added in the session param"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala index 365547e..8ff47af 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala @@ -731,7 +731,6 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""drop table uniqdata""").collect } - //Show loads--->Action=Fail--->Logger=False test("BadRecord_Dataload_025", Include) { dropTable("uniqdata") @@ -745,7 +744,6 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""drop table uniqdata""").collect } - //when insert into null data,query table output NullPointerException test("HQ_DEFECT_2016111509706", Include) { sql(s"""drop table IF EXISTS t_carbn01""").collect http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala new file mode 100644 index 0000000..0249ddf --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala @@ -0,0 +1,99 @@ +package org.apache.carbondata.spark.testsuite.badrecordloger + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class BadRecordActionTest extends QueryTest with BeforeAndAfterAll { + + + val csvFilePath = s"$resourcesPath/badrecords/datasample.csv" + + override def beforeAll = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + sql("drop table if exists sales") + } + + test("test load for bad_record_action=force") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='force', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + checkAnswer(sql("select count(*) from sales"), + Seq(Row(6))) + + } + + test("test load for bad_record_action=FORCE") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='FORCE', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + checkAnswer(sql("select count(*) from sales"), + Seq(Row(6))) + } + + test("test load for bad_record_action=fail") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + intercept[Exception] { + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='fail', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + } + } + + test("test load for bad_record_action=FAIL") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + intercept[Exception] { + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='FAIL', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + } + } + + test("test load for bad_record_action=ignore") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='ignore', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + checkAnswer(sql("select count(*) from sales"), + Seq(Row(2))) + } + + test("test load for bad_record_action=IGNORE") { + sql("drop table if exists sales") + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + + "('bad_records_action'='IGNORE', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + checkAnswer(sql("select count(*) from sales"), + Seq(Row(2))) + } + + override def afterAll() = { + sql("drop table if exists sales") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index b04a58e..5e9f7fe 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -286,7 +286,7 @@ object DataLoadingUtil { TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) carbonLoadModel.setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action) + TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase) carbonLoadModel.setIsEmptyDataBadRecord( DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +