Repository: carbondata Updated Branches: refs/heads/master 2b704349d -> 526e3bfa1
[CARBONDATA-2907] Support setting blocklet size in table property When creating table, should support setting blocklet size (property name TABLE_BLOCKLET_SIZE). If user does not set this table property, use system level property This closes #2682 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/526e3bfa Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/526e3bfa Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/526e3bfa Branch: refs/heads/master Commit: 526e3bfa1bdc8d93eff5aadd1ab2ad63f082d061 Parents: 2b70434 Author: Jacky Li <jacky.li...@qq.com> Authored: Sun Sep 2 00:32:29 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon Sep 3 22:27:16 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 2 + .../schema/table/TableSchemaBuilder.java | 3 +- docs/data-management-on-carbondata.md | 10 ++- .../TestCreateTableWithBlockletSize.scala | 86 ++++++++++++++++++++ .../apache/carbondata/spark/CarbonOption.scala | 2 + .../carbondata/spark/util/CommonUtil.scala | 25 +++--- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 5 +- .../spark/sql/CarbonDataFrameWriter.scala | 1 + .../writer/v3/CarbonFactDataWriterImplV3.java | 17 ++-- .../sdk/file/CSVCarbonWriterTest.java | 46 ++++++++++- .../apache/carbondata/sdk/file/TestUtil.java | 2 +- 11 files changed, 173 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index d324b2d..3fd2267 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -979,6 +979,8 @@ public final class CarbonCommonConstants { public static final String COLUMN_PROPERTIES = "columnproperties"; // table block size in MB public static final String TABLE_BLOCKSIZE = "table_blocksize"; + // table blocklet size in MB + public static final String TABLE_BLOCKLET_SIZE = "table_blocklet_size"; // set in column level to disable inverted index public static final String NO_INVERTED_INDEX = "no_inverted_index"; // table property name of major compaction size http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 09fc5e6..c8eaa16 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -27,7 +27,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -117,7 +116,7 @@ public class TableSchemaBuilder { property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize)); } if (blockletSize > 0) { - property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize)); + property.put(CarbonCommonConstants.TABLE_BLOCKLET_SIZE, String.valueOf(blockletSize)); } // Adding local dictionary, applicable only for String(dictionary exclude) http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/docs/data-management-on-carbondata.md ---------------------------------------------------------------------- diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md index 0c54535..2cde334 100644 --- a/docs/data-management-on-carbondata.md +++ b/docs/data-management-on-carbondata.md @@ -109,13 +109,21 @@ This tutorial is going to introduce all commands and data operations on CarbonDa - **Table Block Size Configuration** - This command is for setting block size of this table, the default value is 1024 MB and supports a range of 1 MB to 2048 MB. + This property is for setting block size of this table, the default value is 1024 MB and supports a range of 1 MB to 2048 MB. ``` TBLPROPERTIES ('TABLE_BLOCKSIZE'='512') ``` **NOTE:** 512 or 512M both are accepted. + - **Table Blocklet Size Configuration** + + This property is for setting blocklet size of this table, the default value is 64 MB. + + ``` + TBLPROPERTIES ('TABLE_BLOCKLET_SIZE'='32') + ``` + - **Table Compaction Configuration** These properties are table level compaction configurations, if not specified, system level configurations in carbon.properties will be used. http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala new file mode 100644 index 0000000..9b37446 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import scala.util.Random + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.reader.CarbonFooterReaderV3 +import org.apache.carbondata.core.util.path.CarbonTablePath + +/** + * Test functionality of create table with blocklet size + */ +class TestCreateTableWithBlockletSize extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("use default") + sql("drop table if exists source") + } + + test("test create table with blocklet size") { + val rdd = sqlContext.sparkContext.parallelize(1 to 1000000) + .map(x => (Random.nextInt(), Random.nextInt().toString)) + sqlContext.createDataFrame(rdd) + .write + .format("carbondata") + .option("table_blocksize", "8") + .option("table_blocklet_size", "3") + .option("tableName", "source") + .save() + + // read footer and verify number of blocklets + val table = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession) + val folder = FileFactory.getCarbonFile(table.getTablePath) + val files = folder.listFiles(true) + import scala.collection.JavaConverters._ + val dataFiles = files.asScala.filter(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)) + dataFiles.foreach { dataFile => + val fileReader = FileFactory + .getFileHolder(FileFactory.getFileType(dataFile.getPath)) + val buffer = fileReader + .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), dataFile.getSize - 8, 8) + val footerReader = new CarbonFooterReaderV3( + dataFile.getAbsolutePath, + buffer.getLong) + val footer = footerReader.readFooterVersion3 + assertResult(2)(footer.blocklet_index_list.size) + assertResult(2)(footer.blocklet_info_list3.size) + } + sql("drop table source") + } + + test("test create table with invalid blocklet size") { + val ex = intercept[MalformedCarbonCommandException] { + sql("CREATE TABLE T1(name String) STORED AS CARBONDATA TBLPROPERTIES('TABLE_BLOCKLET_SIZE'='3X')") + } + assert(ex.getMessage.toLowerCase.contains("invalid table_blocklet_size")) + } + + override def afterAll { + sql("use default") + sql("drop table if exists source") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index 3acfb9c..76b0618 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -58,6 +58,8 @@ class CarbonOption(options: Map[String, String]) { lazy val tableBlockSize: Option[String] = options.get("table_blocksize") + lazy val tableBlockletSize: Option[String] = options.get("table_blocklet_size") + lazy val bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt lazy val bucketColumns: String = options.getOrElse("bucketcolumns", "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index e79c63b..c2f805d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -502,30 +502,31 @@ object CommonUtil { } /** - * This method will validate the table block size specified by the user + * This method will validate the table block size and blocklet size specified by the user * - * @param tableProperties + * @param tableProperties table property specified by user + * @param propertyName property name */ - def validateTableBlockSize(tableProperties: Map[String, String]): Unit = { - var tableBlockSize: Integer = 0 - if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) { + def validateSize(tableProperties: Map[String, String], propertyName: String): Unit = { + var size: Integer = 0 + if (tableProperties.get(propertyName).isDefined) { val blockSizeStr: String = - parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE)) + parsePropertyValueStringInMB(tableProperties(propertyName)) try { - tableBlockSize = Integer.parseInt(blockSizeStr) + size = Integer.parseInt(blockSizeStr) } catch { case e: NumberFormatException => - throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " + + throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " + s"$blockSizeStr, only int value from 1 MB to " + s"2048 MB is supported.") } - if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL || - tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) { - throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " + + if (size < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL || + size > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) { + throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " + s"$blockSizeStr, only int value from 1 MB to " + s"2048 MB is supported.") } - tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr) + tableProperties.put(propertyName, blockSizeStr) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 63c733b..af466d4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -421,8 +421,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { partitionColIntersecLongStrCols.mkString(",") } both in partition and long_string_columns which is not allowed.") } - // validate the tableBlockSize from table properties - CommonUtil.validateTableBlockSize(tableProperties) + // validate the block size and blocklet size in table properties + CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE) + CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKLET_SIZE) // validate table level properties for compaction CommonUtil.validateTableLevelCompactionProperties(tableProperties) // validate flat folder property. http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 6ead03a..80d17a2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -85,6 +85,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { "DICTIONARY_EXCLUDE" -> options.dictionaryExclude, "LONG_STRING_COLUMNS" -> options.longStringColumns, "TABLE_BLOCKSIZE" -> options.tableBlockSize, + "TABLE_BLOCKLET_SIZE" -> options.tableBlockletSize, "STREAMING" -> Option(options.isStreaming.toString) ).filter(_._2.isDefined) .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",") http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index f992e44..4fabbe4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage; import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; @@ -43,6 +42,9 @@ import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; import org.apache.carbondata.processing.store.TablePage; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.TABLE_BLOCKLET_SIZE; +import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB; +import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE; /** * Below class will be used to write the data in V3 format @@ -68,11 +70,14 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter { public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) { super(model); - blockletSizeThreshold = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, - CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE)) - * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR - * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; + String blockletSize = + model.getTableSpec().getCarbonTable().getTableInfo().getFactTable().getTableProperties() + .get(TABLE_BLOCKLET_SIZE); + if (blockletSize == null) { + blockletSize = CarbonProperties.getInstance().getProperty( + BLOCKLET_SIZE_IN_MB, BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE); + } + blockletSizeThreshold = Long.parseLong(blockletSize) << 20; if (blockletSizeThreshold > fileSizeInBytes) { blockletSizeThreshold = fileSizeInBytes; LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold); http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 78d0240..e71d061 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -20,13 +20,21 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider; +import org.apache.carbondata.core.reader.CarbonFooterReaderV3; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.FileFooter3; import org.apache.commons.io.FileUtils; import org.junit.After; @@ -308,10 +316,44 @@ public class CSVCarbonWriterTest { } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); - } - finally { + } finally { FileUtils.deleteDirectory(new File(path)); } } + // validate number of blocklets in one block + @Test + public void testBlocklet() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(1000000, new Schema(fields), path, new String[]{"name"}, + true, 3, 8, false); + + // read footer and verify number of blocklets + CarbonFile folder = FileFactory.getCarbonFile(path); + List<CarbonFile> files = folder.listFiles(true); + List<CarbonFile> dataFiles = new LinkedList<>(); + for (CarbonFile file : files) { + if (file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT)) { + dataFiles.add(file); + } + } + for (CarbonFile dataFile : dataFiles) { + FileReader fileReader = FileFactory.getFileHolder(FileFactory.getFileType(dataFile.getPath())); + ByteBuffer buffer = fileReader.readByteBuffer(FileFactory.getUpdatedFilePath( + dataFile.getPath()), dataFile.getSize() - 8, 8); + CarbonFooterReaderV3 footerReader = + new CarbonFooterReaderV3(dataFile.getAbsolutePath(), buffer.getLong()); + FileFooter3 footer = footerReader.readFooterVersion3(); + Assert.assertEquals(2, footer.blocklet_index_list.size()); + Assert.assertEquals(2, footer.blocklet_info_list3.size()); + } + FileUtils.deleteDirectory(new File(path)); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/526e3bfa/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index fddc97b..83026a2 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -111,7 +111,7 @@ public class TestUtil { * @param blockSize blockSize in the file, -1 for default size * @param isTransactionalTable set to true if this is written for Transactional Table. */ - static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + public static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) { try { CarbonWriterBuilder builder = CarbonWriter.builder()