This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f13d2b  [CARBONDATA-3242] Move Range_Column into the table level 
properties
9f13d2b is described below

commit 9f13d2b6c7639330a2228f834820131c714bb80b
Author: QiangCai <qiang...@qq.com>
AuthorDate: Thu Jan 10 21:03:14 2019 +0800

    [CARBONDATA-3242] Move Range_Column into the table level properties
    
    Move Range_Column into the table level properties
    
    This closes #3063
---
 .../core/constants/CarbonCommonConstants.java      | 11 ++++
 .../core/constants/CarbonLoadOptionConstants.java  |  6 --
 .../core/metadata/schema/table/CarbonTable.java    | 10 +++
 .../carbondata/core/util/CarbonProperties.java     | 24 +++++++
 .../dataload/TestRangeColumnDataLoad.scala         | 74 ++++++++++++++++++----
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  5 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    | 18 +++++-
 .../command/management/CarbonLoadDataCommand.scala | 37 +++++------
 .../org/apache/spark/util/AlterTableUtil.scala     | 25 +++++++-
 .../loading/model/CarbonLoadModelBuilder.java      |  9 ---
 10 files changed, 170 insertions(+), 49 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c1ef940..ccc8b99 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -426,6 +426,7 @@ public final class CarbonCommonConstants {
    */
   public static final String DICTIONARY_PATH = "dictionary_path";
   public static final String SORT_COLUMNS = "sort_columns";
+  public static final String RANGE_COLUMN = "range_column";
   public static final String PARTITION_TYPE = "partition_type";
   public static final String NUM_PARTITIONS = "num_partitions";
   public static final String RANGE_INFO = "range_info";
@@ -1176,6 +1177,16 @@ public final class CarbonCommonConstants {
    */
   public static final int SORT_SIZE_MIN_VAL = 1000;
 
+  /**
+   * For Range_Column, it will use SCALE_FACTOR to control the size of each 
partition.
+   * When SCALE_FACTOR is the compression ratio of carbonData,
+   * each task will generate one CarbonData file.
+   * And the size of this CarbonData file is about TABLE_BLOCKSIZE of this 
table.
+   */
+  public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR = 
"carbon.range.column.scale.factor";
+
+  public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
+
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Query parameter start here
   
//////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index eef2bef..225a8aa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -172,10 +172,4 @@ public final class CarbonLoadOptionConstants {
 
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT 
= "0";
 
-  /**
-   * For Range_Column, it will use SCALE_FACTOR to control the size of each 
partition.
-   * When SCALE_FACTOR is about the compression ratio, each task will generate 
one CarbonData file.
-   * And the size of the file is about TABLE_BLOCKSIZE of this table.
-   */
-  public static final int CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = 3;
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index daaed9d..f89dd6c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -947,6 +947,16 @@ public class CarbonTable implements Serializable {
     return numberOfNoDictSortColumns;
   }
 
+  public CarbonColumn getRangeColumn() {
+    String rangeColumn =
+        
tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.RANGE_COLUMN);
+    if (rangeColumn == null) {
+      return null;
+    } else {
+      return getColumnByName(getTableName(), rangeColumn);
+    }
+  }
+
   public TableInfo getTableInfo() {
     return tableInfo;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 28f6e75..f9131f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1364,6 +1364,30 @@ public final class CarbonProperties {
     return thresholdSize;
   }
 
+  public int getRangeColumnScaleFactor() {
+    boolean isValid = true;
+    int scaleFactor = 1;
+    try {
+      scaleFactor = 
Integer.parseInt(CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR,
+          CarbonCommonConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT));
+      if (scaleFactor < 1 || scaleFactor > 300) {
+        isValid = false;
+      }
+    } catch (NumberFormatException ex) {
+      LOGGER.warn("Range column scala factor isn't number format");
+      isValid = false;
+    }
+
+    if (isValid) {
+      return scaleFactor;
+    } else {
+      LOGGER.warn("The scale factor is invalid. Using the default value "
+          + CarbonCommonConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT);
+      return 
Integer.parseInt(CarbonCommonConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT);
+    }
+  }
+
   /**
    * Get the number of hours the segment lock files will be preserved.
    * It will be converted to microseconds to return.
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 4803fb2..2caf46c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -25,12 +25,14 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.load.PrimtiveOrdering
 
@@ -50,6 +52,8 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
     sql("DROP TABLE IF EXISTS carbon_range_column2")
     sql("DROP TABLE IF EXISTS carbon_range_column3")
     sql("DROP TABLE IF EXISTS carbon_range_column4")
+    sql("DROP TABLE IF EXISTS carbon_range_column5")
+    sql("DROP TABLE IF EXISTS carbon_range_column6")
   }
 
   test("range_column with option GLOBAL_SORT_PARTITIONS") {
@@ -57,11 +61,11 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
       """
         | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, 
age INT)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, 
city', 'range_column'='name')
       """.stripMargin)
 
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " 
+
-        "OPTIONS('GLOBAL_SORT_PARTITIONS'='1', 'range_column'='name')")
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
 
     assert(getIndexFileCount("carbon_range_column1") === 1)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column1"), Seq(Row(12)))
@@ -74,11 +78,11 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
       """
         | CREATE TABLE carbon_range_column2(id INT, name STRING, city STRING, 
age INT)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, 
city', 'range_column'='name')
       """.stripMargin)
 
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column2 " 
+
-        "OPTIONS('scale_factor'='10', 'range_column'='name')")
+        "OPTIONS('scale_factor'='10')")
 
     assert(getIndexFileCount("carbon_range_column2") === 1)
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column2"), Seq(Row(12)))
@@ -86,18 +90,31 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
       sql("SELECT * FROM carbon_range_column2 ORDER BY name"))
   }
 
-  test("range_column only support single column ") {
+  test("only support single column for create table") {
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          | CREATE TABLE carbon_range_column3(id INT, name STRING, city 
STRING, age INT)
+          | STORED BY 'org.apache.carbondata.format'
+          | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, 
city', 'range_column'='name,id')
+        """.stripMargin)
+    }
+    assertResult("range_column not support multiple columns")(ex.getMessage)
+  }
+
+  test("load data command not support range_column") {
     sql(
       """
         | CREATE TABLE carbon_range_column3(id INT, name STRING, city STRING, 
age INT)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, 
city', 'range_column'='name')
       """.stripMargin)
 
-    intercept[InvalidLoadOptionException] {
+    val ex = intercept[MalformedCarbonCommandException] {
       sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column3 
" +
-          "OPTIONS('scale_factor'='10', 'range_column'='name,id')")
+          "OPTIONS('scale_factor'='10', 'range_column'='name')")
     }
+    assertResult("Error: Invalid option(s): range_column")(ex.getMessage)
   }
 
   test("range_column with data skew") {
@@ -105,7 +122,7 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
       """
         | CREATE TABLE carbon_range_column4(c1 int, c2 string)
         | STORED AS carbondata
-        | TBLPROPERTIES('sort_columns'='c1,c2', 'sort_scope'='local_sort')
+        | TBLPROPERTIES('sort_columns'='c1,c2', 'sort_scope'='local_sort', 
'range_column'='c2')
       """.stripMargin)
 
     val dataSkewPath = s"$resourcesPath/range_column"
@@ -113,7 +130,7 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
     sql(
       s"""LOAD DATA LOCAL INPATH '$dataSkewPath'
          | INTO TABLE carbon_range_column4
-         | OPTIONS('FILEHEADER'='c1,c2', 'range_column'='c2', 
'global_sort_partitions'='10')
+         | OPTIONS('FILEHEADER'='c1,c2', 'global_sort_partitions'='10')
         """.stripMargin)
 
     assert(getIndexFileCount("carbon_range_column4") === 9)
@@ -200,6 +217,41 @@ class TestRangeColumnDataLoad extends QueryTest with 
BeforeAndAfterEach with Bef
     }
   }
 
+  test("range_column with system property carbon.range.column.scale.factor") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR,
+      "10"
+    )
+
+    sql(
+      """
+        | CREATE TABLE carbon_range_column5(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, 
city', 'range_column'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column5 ")
+
+    assert(getIndexFileCount("carbon_range_column5") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column5"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_range_column5"),
+      sql("SELECT * FROM carbon_range_column5 ORDER BY name"))
+  }
+
+  test("set and unset table property: range_column") {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column6(id INT, name STRING, city STRING, 
age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
+      """.stripMargin)
+
+    sql("ALTER TABLE carbon_range_column6 SET 
TBLPROPERTIES('range_column'='city')")
+    sql("ALTER TABLE carbon_range_column6 SET 
TBLPROPERTIES('range_column'='name')")
+    sql("ALTER TABLE carbon_range_column6 UNSET TBLPROPERTIES('range_column')")
+    sql("ALTER TABLE carbon_range_column6 SET 
TBLPROPERTIES('range_column'='name')")
+  }
+
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): 
Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
tableName)
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentNo)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index ec1153a..a5d354a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -304,9 +304,8 @@ object DataLoadProcessBuilderOnSpark {
         val blockSize = 1024L * 1024 * table.getBlockSizeInMB
         val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
         val scaleFactor = if (model.getScaleFactor == 0) {
-          // here it assumes the compression ratio of CarbonData is about 30%,
-          // so it multiply by 3 to get the split size of CSV files.
-          CarbonLoadOptionConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT
+          // use system properties
+          CarbonProperties.getInstance().getRangeColumnScaleFactor
         } else {
           model.getScaleFactor
         }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index a669931..523d59c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -787,6 +787,23 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       }
     }
 
+    // range_column should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.RANGE_COLUMN).isDefined) {
+      val rangeColumn = 
tableProperties.get(CarbonCommonConstants.RANGE_COLUMN).get.trim
+      if (rangeColumn.contains(",")) {
+        val errorMsg = "range_column not support multiple columns"
+        throw new MalformedCarbonCommandException(errorMsg)
+      }
+      val rangeField = fields.find(_.column.equalsIgnoreCase(rangeColumn))
+      if (rangeField.isEmpty) {
+        val errorMsg = "range_column: " + rangeColumn +
+                       " does not exist in table. Please check the create 
table statement."
+        throw new MalformedCarbonCommandException(errorMsg)
+      } else {
+        tableProperties.put(CarbonCommonConstants.RANGE_COLUMN, 
rangeField.get.column)
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if 
(tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       LOGGER.warn("dictionary_exclude option was deprecated, " +
@@ -1104,7 +1121,6 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       "SKIP_EMPTY_LINE",
       "SORT_COLUMN_BOUNDS",
       "LOAD_MIN_SIZE_INMB",
-      "RANGE_COLUMN",
       "SCALE_FACTOR",
       "SORT_SCOPE"
     )
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 95c0767..0030156 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -219,24 +219,25 @@ case class CarbonLoadDataCommand(
               carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
                 CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
 
-      optionsFinal
-        .put("bad_record_path", 
CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
-      val factPath = if (dataFrame.isDefined) {
-        ""
-      } else {
-        FileUtils.getPaths(factPathFromUser, hadoopConf)
-      }
-      carbonLoadModel.setParentTablePath(parentTablePath)
-      carbonLoadModel.setFactFilePath(factPath)
-      
carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
-      carbonLoadModel.setAggLoadRequest(
-        internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL,
-          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL_DEFAULT).toBoolean)
-      
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
-      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
-        .getOrElse(CarbonCommonConstants.COMPRESSOR,
-          CompressorFactory.getInstance().getCompressor.getName)
-      carbonLoadModel.setColumnCompressor(columnCompressor)
+    optionsFinal
+      .put("bad_record_path", 
CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
+    val factPath = if (dataFrame.isDefined) {
+      ""
+    } else {
+      FileUtils.getPaths(factPathFromUser, hadoopConf)
+    }
+    carbonLoadModel.setParentTablePath(parentTablePath)
+    carbonLoadModel.setFactFilePath(factPath)
+    
carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
+    carbonLoadModel.setAggLoadRequest(
+      internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL,
+        CarbonCommonConstants.IS_INTERNAL_LOAD_CALL_DEFAULT).toBoolean)
+    
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+    val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
+    carbonLoadModel.setRangePartitionColumn(table.getRangeColumn)
 
     val javaPartition = mutable.Map[String, String]()
     partition.foreach { case (k, v) =>
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 2ca834d..045d2d5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -357,6 +357,9 @@ object AlterTableUtil {
       // validate the load min size properties
       validateLoadMinSizeProperties(carbonTable, lowerCasePropertiesMap)
 
+      // validate the range column properties
+      validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap)
+
       // below map will be used for cache invalidation. As tblProperties map 
is getting modified
       // in the next few steps the original map need to be retained for any 
decision making
       val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
@@ -428,7 +431,8 @@ object AlterTableUtil {
       "LOCAL_DICTIONARY_THRESHOLD",
       "LOCAL_DICTIONARY_INCLUDE",
       "LOCAL_DICTIONARY_EXCLUDE",
-      "LOAD_MIN_SIZE_INMB")
+      "LOAD_MIN_SIZE_INMB",
+      "RANGE_COLUMN")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
@@ -511,6 +515,25 @@ object AlterTableUtil {
     }
   }
 
+  def validateRangeColumnProperties(carbonTable: CarbonTable,
+      propertiesMap: mutable.Map[String, String]): Unit = {
+    if (propertiesMap.get(CarbonCommonConstants.RANGE_COLUMN).isDefined) {
+      val rangeColumnProp = 
propertiesMap.get(CarbonCommonConstants.RANGE_COLUMN).get
+      if (rangeColumnProp.contains(",")) {
+        val errorMsg = "range_column not support multiple columns"
+        throw new MalformedCarbonCommandException(errorMsg)
+      }
+      val rangeColumn = carbonTable.getColumnByName(carbonTable.getTableName, 
rangeColumnProp)
+      if (rangeColumn == null) {
+        throw new MalformedCarbonCommandException(
+          s"Table property ${ CarbonCommonConstants.RANGE_COLUMN }: ${ 
rangeColumnProp }" +
+          s" is not exists in the table")
+      } else {
+        propertiesMap.put(CarbonCommonConstants.RANGE_COLUMN, 
rangeColumn.getColName)
+      }
+    }
+  }
+
   /**
    * This method will validate if there is any complex type column in the 
columns to be cached
    *
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 739776c..eb3c253 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -306,15 +306,6 @@ public class CarbonLoadModelBuilder {
 
   private void validateRangeColumn(Map<String, String> optionsFinal,
       CarbonLoadModel carbonLoadModel) throws InvalidLoadOptionException {
-    String rangeColumn = optionsFinal.get("range_column");
-    if (rangeColumn != null) {
-      carbonLoadModel
-          .setRangePartitionColumn(table.getColumnByName(table.getTableName(), 
rangeColumn));
-      if (carbonLoadModel.getRangePartitionColumn() == null) {
-        throw new InvalidLoadOptionException("Invalid range_column option");
-      }
-    }
-
     String scaleFactor = optionsFinal.get("scale_factor");
     if (scaleFactor != null) {
       try {

Reply via email to