[CARBONDATA-1964] Fixed bug to set bad.records.action parameter using SET 
command

Fixed bug to set bad.records.action parameter using SET command

This closes #1819


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e3498201
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e3498201
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e3498201

Branch: refs/heads/fgdatamap
Commit: e3498201e084167d3268eb545c2a5ee34269705f
Parents: 8314ea2
Author: Geetika Gupta <geetika.gu...@knoldus.in>
Authored: Wed Jan 17 13:31:56 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Jan 31 11:21:14 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/util/SessionParams.java     |  4 +
 .../sdv/generated/DataLoadingTestCase.scala     |  2 -
 .../badrecordloger/BadRecordActionTest.scala    | 99 ++++++++++++++++++++
 .../carbondata/spark/util/DataLoadingUtil.scala |  2 +-
 4 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index afbd947..ddc7539 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
@@ -104,6 +105,9 @@ public class SessionParams implements Serializable {
       throws InvalidConfigurationException {
     boolean isValidConf = validateKeyValue(key, value);
     if (isValidConf) {
+      if 
(key.equals(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)) {
+        value = value.toUpperCase();
+      }
       if (doAuditing) {
         LOGGER.audit("The key " + key + " with value " + value + " added in 
the session param");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
index 365547e..8ff47af 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
@@ -731,7 +731,6 @@ class DataLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
      sql(s"""drop table uniqdata""").collect
   }
 
-
   //Show loads--->Action=Fail--->Logger=False
   test("BadRecord_Dataload_025", Include) {
     dropTable("uniqdata")
@@ -745,7 +744,6 @@ class DataLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
      sql(s"""drop table uniqdata""").collect
   }
 
-
   //when insert into null data,query table output NullPointerException
   test("HQ_DEFECT_2016111509706", Include) {
      sql(s"""drop table IF EXISTS t_carbn01""").collect

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
new file mode 100644
index 0000000..0249ddf
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -0,0 +1,99 @@
+package org.apache.carbondata.spark.testsuite.badrecordloger
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class BadRecordActionTest extends QueryTest with BeforeAndAfterAll  {
+
+
+  val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+
+  override def beforeAll = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    sql("drop table if exists sales")
+  }
+
+  test("test load for bad_record_action=force") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+        "('bad_records_action'='force', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(6)))
+
+  }
+
+  test("test load for bad_record_action=FORCE") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+        "('bad_records_action'='FORCE', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(6)))
+  }
+
+  test("test load for bad_record_action=fail") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    intercept[Exception] {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+          "('bad_records_action'='fail', 'DELIMITER'=" +
+          " ',', 'QUOTECHAR'= '\"')")
+    }
+  }
+
+  test("test load for bad_record_action=FAIL") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    intercept[Exception] {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+          "('bad_records_action'='FAIL', 'DELIMITER'=" +
+          " ',', 'QUOTECHAR'= '\"')")
+    }
+  }
+
+  test("test load for bad_record_action=ignore") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+        "('bad_records_action'='ignore', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(2)))
+  }
+
+  test("test load for bad_record_action=IGNORE") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country 
String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales 
OPTIONS" +
+        "('bad_records_action'='IGNORE', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(2)))
+  }
+
+  override def afterAll() = {
+    sql("drop table if exists sales")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3498201/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index b04a58e..5e9f7fe 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -286,7 +286,7 @@ object DataLoadingUtil {
         TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + 
bad_records_logger_enable)
 
     carbonLoadModel.setBadRecordsAction(
-        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + 
bad_records_action)
+        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + 
bad_records_action.toUpperCase)
 
     carbonLoadModel.setIsEmptyDataBadRecord(
         DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +

Reply via email to