[CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support 
configuring the storage level when insert into data with 
'carbon.insert.persist.enable'='true'

When insert into data with 'carbon.insert.persist.enable'='true', the storage 
level of dataset is 'MEMORY_AND_DISK',
it should support configuring the storage level to correspond to different 
environment.

This closes #2373


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181f0ac9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181f0ac9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181f0ac9

Branch: refs/heads/carbonstore
Commit: 181f0ac9bed6ff7d83268f6c058aee943b348ddc
Parents: f0c8834
Author: Zhang Zhichao <441586...@qq.com>
Authored: Thu Jun 14 14:48:47 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sat Jun 16 03:32:36 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 25 ++++++++++++++++++++
 .../carbondata/core/util/CarbonProperties.java  | 18 ++++++++++++++
 docs/configuration-parameters.md                |  4 ++++
 .../management/CarbonInsertIntoCommand.scala    |  5 ++--
 4 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c7281dd..19ff494 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -866,6 +866,7 @@ public final class CarbonCommonConstants {
    * to run load and insert queries on source table concurrently then user can 
enable this flag
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_INSERT_PERSIST_ENABLED = 
"carbon.insert.persist.enable";
 
   /**
@@ -875,6 +876,27 @@ public final class CarbonCommonConstants {
   public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false";
 
   /**
+   * Which storage level to persist dataset when insert into data
+   * with 'carbon.insert.persist.enable'='true'
+   */
+  @CarbonProperty
+  @InterfaceStability.Evolving
+  public static final String CARBON_INSERT_STORAGE_LEVEL =
+      "carbon.insert.storage.level";
+
+  /**
+   * The default value(MEMORY_AND_DISK) is the same as the default storage 
level of Dataset.
+   * Unlike `RDD.cache()`, the default storage level is set to be 
`MEMORY_AND_DISK` because
+   * recomputing the in-memory columnar representation of the underlying table 
is expensive.
+   *
+   * if user's executor has less memory, set the CARBON_INSERT_STORAGE_LEVEL
+   * to MEMORY_AND_DISK_SER or other storage level to correspond to different 
environment.
+   * You can get more recommendations about storage level in spark website:
+   * 
http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+   */
+  public static final String CARBON_INSERT_STORAGE_LEVEL_DEFAULT = 
"MEMORY_AND_DISK";
+
+  /**
    * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";
@@ -1094,6 +1116,7 @@ public final class CarbonCommonConstants {
    * to determine to use the rdd persist or not.
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String isPersistEnabled = "carbon.update.persist.enable";
 
   /**
@@ -1117,6 +1140,7 @@ public final class CarbonCommonConstants {
    * with 'carbon.update.persist.enable'='true'
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_UPDATE_STORAGE_LEVEL =
       "carbon.update.storage.level";
 
@@ -1354,6 +1378,7 @@ public final class CarbonCommonConstants {
    * Which storage level to persist rdd when sort_scope=global_sort
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL =
       "carbon.global.sort.rdd.storage.level";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6eb7de6..b134a7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1581,4 +1581,22 @@ public final class CarbonProperties {
       return defaultValue;
     }
   }
+
+  /**
+   * Return valid storage level for CARBON_INSERT_STORAGE_LEVEL
+   * @return String
+   */
+  public String getInsertIntoDatasetStorageLevel() {
+    String storageLevel = 
getProperty(CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL,
+        CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT);
+    boolean validateStorageLevel = 
CarbonUtil.isValidStorageLevel(storageLevel);
+    if (!validateStorageLevel) {
+      LOGGER.warn("The " + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL
+          + " configuration value is invalid. It will use default storage 
level("
+          + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT
+          + ") to persist dataset.");
+      storageLevel = CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT;
+    }
+    return storageLevel.toUpperCase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 11cc6ea..f81959e 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -55,7 +55,11 @@ This section provides the details of all the configurations 
required for CarbonD
 | carbon.max.driver.lru.cache.size | -1 | Max LRU cache size upto which data 
will be loaded at the driver side. This value is expressed in MB. Default value 
of -1 means there is no memory limit for caching. Only integer values greater 
than 0 are accepted. |  |
 | carbon.max.executor.lru.cache.size | -1 | Max LRU cache size upto which data 
will be loaded at the executor side. This value is expressed in MB. Default 
value of -1 means there is no memory limit for caching. Only integer values 
greater than 0 are accepted. If this parameter is not configured, then the 
carbon.max.driver.lru.cache.size value will be considered. |  |
 | carbon.merge.sort.prefetch | true | Enable prefetch of data during merge 
sort while reading data from sort temp files in data loading. |  |
+| carbon.insert.persist.enable | false | Enabling this parameter considers 
persistent data. If we are executing insert into query from source table using 
select statement & loading the same source table concurrently, when select 
happens on source table during the data load, it gets new record for which 
dictionary is not generated, so there will be inconsistency. To avoid this 
condition we can persist the dataframe into MEMORY_AND_DISK(default value) and 
perform insert into operation. By default this value will be false because no 
need to persist the dataframe in all cases. If user wants to run load and 
insert queries on source table concurrently then user can enable this 
parameter. |  |
+| carbon.insert.storage.level | MEMORY_AND_DISK | Which storage level to 
persist dataframe when 'carbon.insert.persist.enable'=true, if user's executor 
has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage 
level to correspond to different environment. [See 
detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).
 |  |
 | carbon.update.persist.enable | true | Enabling this parameter considers 
persistent data. Enabling this will reduce the execution time of UPDATE 
operation. |  |
+| carbon.update.storage.level | MEMORY_AND_DISK | Which storage level to 
persist dataframe when 'carbon.update.persist.enable'=true, if user's executor 
has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage 
level to correspond to different environment. [See 
detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).
 |  |
+| carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Which storage level to 
persist rdd when loading data with 'sort_scope'='global_sort', if user's 
executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other 
storage level to correspond to different environment. [See 
detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).
 |  |
 | carbon.load.global.sort.partitions | 0 | The Number of partitions to use 
when shuffling data for sort. If user don't configurate or configurate it less 
than 1, it uses the number of map tasks as reduce tasks. In general, we 
recommend 2-3 tasks per CPU core in your cluster.
 | carbon.options.bad.records.logger.enable | false | Whether to create logs 
with details about bad records. | |
 | carbon.bad.records.action | FORCE | This property can have four types of 
actions for bad records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then 
it auto-corrects the data by storing the bad records as NULL. If set to 
REDIRECT then bad records are written to the raw CSV instead of being loaded. 
If set to IGNORE then bad records are neither loaded nor written to the raw 
CSV. If set to FAIL then data loading fails if any bad records are found. | |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 702f954..6c74ad2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -53,8 +53,9 @@ case class CarbonInsertIntoCommand(
     val df =
       if (isPersistRequired) {
         LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child)
-          .persist(StorageLevel.MEMORY_AND_DISK)
+        Dataset.ofRows(sparkSession, child).persist(
+          StorageLevel.fromString(
+            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
       } else {
         Dataset.ofRows(sparkSession, child)
       }

Reply via email to