[CARBONDATA-2553] support ZSTD compression for sort temp file

This closes #2350


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ece06729
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ece06729
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ece06729

Branch: refs/heads/carbonstore
Commit: ece0672930b8bffba8e9bddad63560ff9d6cd582
Parents: 5593d16
Author: Manhua <kevin...@qq.com>
Authored: Tue May 29 09:21:52 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Mon Jun 18 21:31:02 2018 +0800

----------------------------------------------------------------------
 core/pom.xml                                    |  5 ++
 .../core/constants/CarbonCommonConstants.java   |  2 +-
 .../datastore/filesystem/LocalCarbonFile.java   |  8 +++
 .../carbondata/core/util/CarbonProperties.java  |  4 +-
 docs/useful-tips-on-carbondata.md               |  2 +-
 .../TestLoadWithSortTempCompressed.scala        | 51 +++++++++++++++++++-
 6 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 7d87037..c145c3b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -68,6 +68,11 @@
       <version>${snappy.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+      <version>1.3.2-2</version>
+    </dependency>
+    <dependency>
       <groupId>org.jmockit</groupId>
       <artifactId>jmockit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2fcf0f5..355bcb6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1378,7 +1378,7 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SORT_TEMP_COMPRESSOR = 
"carbon.sort.temp.compressor";
 
   /**
-   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4'.
+   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD'.
    * By default, empty means that Carbondata will not compress the sort temp 
files.
    */
   public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 60b7e17..5b6f657 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -42,6 +42,8 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
 import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@@ -290,6 +292,8 @@ public class LocalCarbonFile implements CarbonFile {
       inputStream = new SnappyInputStream(new FileInputStream(path));
     } else if ("LZ4".equalsIgnoreCase(compressor)) {
       inputStream = new LZ4BlockInputStream(new FileInputStream(path));
+    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
+      inputStream = new ZstdInputStream(new FileInputStream(path));
     } else {
       throw new IOException("Unsupported compressor: " + compressor);
     }
@@ -368,6 +372,10 @@ public class LocalCarbonFile implements CarbonFile {
       outputStream = new SnappyOutputStream(new FileOutputStream(path));
     } else if ("LZ4".equalsIgnoreCase(compressor)) {
       outputStream = new LZ4BlockOutputStream(new FileOutputStream(path));
+    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
+      // compression level 1 is cost-effective for sort temp file
+      // which is not used for storage
+      outputStream = new ZstdOutputStream(new FileOutputStream(path), 1);
     } else {
       throw new IOException("Unsupported compressor: " + compressor);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index b134a7c..dc50ab0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1290,11 +1290,11 @@ public final class CarbonProperties {
     String compressor = 
getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
         
CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT).toUpperCase();
     if (compressor.isEmpty() || "SNAPPY".equals(compressor) || 
"GZIP".equals(compressor)
-        || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
+        || "BZIP2".equals(compressor) || "LZ4".equals(compressor) || 
"ZSTD".equals(compressor)) {
       return compressor;
     } else {
       LOGGER.warn("The 
".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
-          .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 
and")
+          .concat(" configuration value is invalid. Only snappy, gzip, bip2, 
lz4, zstd and")
           .concat(" empty are allowed. It will not compress the sort temp 
files by default"));
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md 
b/docs/useful-tips-on-carbondata.md
index 732d38f..d00f785 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -168,7 +168,7 @@
   | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data 
loading | The buffer size to store records, returned from the block scan. | In 
limit scenario this parameter is very important. For example your query limit 
is 1000. But if we set this value to 3000 that means we get 3000 records from 
scan but spark will only take 1000 rows. So the 2000 remaining are useless. In 
one Finance test case after we set it to 100, in the limit 1000 scenario the 
performance increase about 2 times in comparison to if we set this value to 
12000. |
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | 
Whether use YARN local directories for multi-table load disk load balance | If 
this is set it to true CarbonData will use YARN local directories for 
multi-table load disk load balance, that will improve the data load 
performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
-  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that 
Carbondata will not compress the sort temp files. This parameter will be useful 
if you encounter disk bottleneck. |
+  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means 
that Carbondata will not compress the sort temp files. This parameter will be 
useful if you encounter disk bottleneck. |
   | carbon.load.skewedDataOptimization.enabled | 
spark/carbonlib/carbon.properties | Data loading | Whether to enable size based 
block allocation strategy for data loading. | When loading, carbondata will use 
file size based block allocation strategy for task distribution. It will make 
sure that all the executors process the same size of data -- It's useful if the 
size of your input data files varies widely, say 1MB~1GB. |
   | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data 
loading | Whether to enable node minumun input data size allocation strategy 
for data loading.| When loading, carbondata will use node minumun input data 
size allocation strategy for task distribution. It will make sure the node load 
the minimum amount of data -- It's useful if the size of your input data files 
very small, say 1MB~256MB,Avoid generating a large number of small files. |
   

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
index 61acea4..5fbdd14 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
@@ -50,9 +50,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
 
   override protected def beforeAll(): Unit = {
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
-      "SNAPPY")
   }
+
   override def afterAll(): Unit = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
@@ -84,6 +83,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for simple table with sort temp compressed with snappy" 
+
        " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
     testSimpleTable()
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -92,6 +93,28 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for simple table with sort temp compressed with snappy" 
+
        " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    testSimpleTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with zstd" +
+       " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    testSimpleTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with zstd" +
+       " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
     testSimpleTable()
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -138,6 +161,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for complex table with sort temp compressed with 
snappy" +
        " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
     testComplexTable()
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -146,6 +171,28 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for complex table with sort temp compressed with 
snappy" +
        " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    testComplexTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with zstd" +
+       " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    testComplexTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with zstd" +
+       " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
     testComplexTable()
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,

Reply via email to