Added batch sort to load options and added test cases Added sort_scope to load options
rebase rebase Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d734f530 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d734f530 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d734f530 Branch: refs/heads/branch-1.1 Commit: d734f53006308a675af30acefa798c814ada3329 Parents: 211c23b Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu May 11 23:54:30 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 12:56:20 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 10 +- .../carbondata/hadoop/CarbonInputSplit.java | 16 +- .../dataload/TestBatchSortDataLoad.scala | 230 +++++++++++++++++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 11 +- .../execution/command/carbonTableSchema.scala | 4 + .../execution/command/carbonTableSchema.scala | 4 + .../DataLoadFailAllTypeSortTest.scala | 27 ++- .../processing/model/CarbonLoadModel.java | 30 ++- .../newflow/DataLoadProcessBuilder.java | 12 +- .../newflow/sort/SortScopeOptions.java | 63 +++++ .../processing/newflow/sort/SorterFactory.java | 7 +- .../newflow/sort/unsafe/UnsafeSortDataRows.java | 5 +- .../sortandgroupby/sortdata/SortParameters.java | 13 ++ .../util/CarbonDataProcessorUtil.java | 51 ++++ 14 files changed, 449 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 269a75f..e1f3e9d 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1104,15 +1104,15 @@ public final class CarbonCommonConstants { /** * Sorts the data in batches and writes the batch data to store with index file. */ - public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort"; + public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope"; /** - * If set to true, the sorting scope is smaller and more index tree will be created, + * If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created, * thus loading is faster but query maybe slower. - * If set to false, the sorting scope is bigger and one index tree per data node will be created, - * thus loading is slower but query is faster. + * If set to LOCAL_SORT, the sorting scope is bigger and one index tree per data node will be + * created, thus loading is slower but query is faster. */ - public static final String LOAD_USE_BATCH_SORT_DEFAULT = "false"; + public static final String LOAD_SORT_SCOPE_DEFAULT = "LOCAL_SORT"; /** * Size of batch data to keep in memory, as a thumb rule it supposed http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 0dcaba2..08661a2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.Distributable; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.internal.index.Block; @@ -84,7 +85,11 @@ public class CarbonInputSplit extends FileSplit ColumnarFormatVersion version) { super(path, start, length, locations); this.segmentId = segmentId; - this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + if (taskNo.contains("_")) { + taskNo = taskNo.split("_")[0]; + } + this.taskId = taskNo; this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName()); this.invalidSegments = new ArrayList<>(); this.version = version; @@ -237,10 +242,11 @@ public class CarbonInputSplit extends FileSplit String filePath1 = this.getPath().getName(); String filePath2 = other.getPath().getName(); if (CarbonTablePath.isCarbonDataFile(filePath1)) { - int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1)); - int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2)); - if (firstTaskId != otherTaskId) { - return firstTaskId - otherTaskId; + byte[] firstTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath1).getBytes(); + byte[] otherTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath2).getBytes(); + int compare = ByteUtil.compare(firstTaskId, otherTaskId); + if (compare != 0) { + return compare; } int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala new file mode 100644 index 0000000..70007c6 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataload + +import java.io.{BufferedWriter, File, FileWriter, FilenameFilter} + +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.Row +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { + var filePath: String = _ + + + def buildTestData() = { + filePath = s"${integrationPath}/spark-common-test/target/big.csv" + val file = new File(filePath) + val writer = new BufferedWriter(new FileWriter(file)) + writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10") + writer.newLine() + for(i <- 0 until 200000) { + writer.write("a" + i%1000 + "," + + "b" + i%1000 + "," + + "c" + i%1000 + "," + + "d" + i%1000 + "," + + "e" + i%1000 + "," + + "f" + i%1000 + "," + + i%1000 + "," + + i%1000 + "," + + i%1000 + "," + + i%1000 + "\n") + if ( i % 10000 == 0) { + writer.flush() + } + } + writer.close() + } + + def dropTable() = { + sql("DROP TABLE IF EXISTS carbon_load1") + sql("DROP TABLE IF EXISTS carbon_load2") + sql("DROP TABLE IF EXISTS carbon_load3") + sql("DROP TABLE IF EXISTS carbon_load4") + sql("DROP TABLE IF EXISTS carbon_load5") + sql("DROP TABLE IF EXISTS carbon_load6") + } + + + + override def beforeAll { + dropTable + buildTestData + } + + + + test("test batch sort load by passing option to load command") { + + sql( + """ + | CREATE TABLE carbon_load1(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + + checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000))) + + assert(getIndexfileCount("carbon_load1") == 12, "Something wrong in batch sort") + } + + test("test batch sort load by passing option to load command and compare with normal load") { + + sql( + """ + | CREATE TABLE carbon_load2(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load2 ") + + checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"), + sql("select * from carbon_load2 where c1='a1' order by c1")) + + } + + test("test batch sort load by passing option and compaction") { + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + sql("alter table carbon_load1 compact 'major'") + Thread.sleep(4000) + checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000))) + + assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in compaction after batch sort") + + } + + test("test batch sort load by passing option in one load and with out option in other load and then do compaction") { + + sql( + """ + | CREATE TABLE carbon_load5(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ") + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')") + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ") + + checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000))) + + checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"), + sql("select * from carbon_load5 where c1='a1' order by c1")) + + sql("alter table carbon_load5 compact 'major'") + Thread.sleep(4000) + + assert(getIndexfileCount("carbon_load5", "0.1") == 1, + "Something wrong in compaction after batch sort") + + checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"), + sql("select * from carbon_load5 where c1='a1' order by c1")) + + } + + test("test batch sort load by passing option with single pass") { + + sql( + """ + | CREATE TABLE carbon_load3(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " + + s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 'single_pass'='true')") + + checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000))) + + assert(getIndexfileCount("carbon_load3") == 12, "Something wrong in batch sort") + + checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"), + sql("select * from carbon_load2 where c1='a1' order by c1")) + + } + + test("test batch sort load by with out passing option but through carbon properties") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "1") + sql( + """ + | CREATE TABLE carbon_load4(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " ) + + checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000))) + + assert(getIndexfileCount("carbon_load4") == 12, "Something wrong in batch sort") + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0") + } + + test("test batch sort load by with out passing option but through carbon properties with default size") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT") + sql( + """ + | CREATE TABLE carbon_load6(c1 string, c2 string, c3 string, c4 string, c5 string, + | c6 string, c7 int, c8 int, c9 int, c10 int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " ) + + checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000))) + + assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch sort") + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + } + + def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = { + val store = storeLocation +"/default/"+ tableName + "/Fact/Part0/Segment_"+segmentNo + val list = new File(store).list(new FilenameFilter { + override def accept(dir: File, name: String) = name.endsWith(".carbonindex") + }) + list.size + } + + override def afterAll { + dropTable + new File(filePath).delete() + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index afc4a58..a701c72 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.processing.constants.LoggerAction +import org.apache.carbondata.processing.newflow.sort.SortScopeOptions import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -753,7 +754,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT", "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION", "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", - "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD" + "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB" ) var isSupported = true val invalidOptions = StringBuilder.newBuilder @@ -808,6 +809,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { } } + if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) { + val optionValue: String = options.get("sort_scope").get.head._2 + if (!SortScopeOptions.isValidSortOption(optionValue)) { + throw new MalformedCarbonCommandException( + "option SORT_SCOPE can have option either BATCH_SORT or LOCAL_SORT or GLOBAL_SORT") + } + } + // check for duplicate options val duplicateOptions = options filter { case (_, optionlist) => optionlist.size > 1 http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 1192e08..494beff 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -405,6 +405,8 @@ case class LoadTable( val dateFormat = options.getOrElse("dateformat", null) validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) + val sortScope = options.getOrElse("sort_scope", null) + val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) @@ -428,6 +430,8 @@ case class LoadTable( carbonLoadModel .setIsEmptyDataBadRecord( DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord) + carbonLoadModel.setSortScope(sortScope) + carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) // when single_pass=true, and not use all dict val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match { case "true" => http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index e2405f2..09824d8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -417,6 +417,8 @@ case class LoadTable( val dateFormat = options.getOrElse("dateformat", null) validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) + val sortScope = options.getOrElse("sort_scope", null) + val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#")) @@ -439,6 +441,8 @@ case class LoadTable( carbonLoadModel .setIsEmptyDataBadRecord( DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord) + carbonLoadModel.setSortScope(sortScope) + carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match { case "true" => true http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala index 0465aa7..5e91574 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala @@ -116,9 +116,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "batch_sort") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL") sql("create table data_bm(name String, dob long, weight int) " + "STORED BY 'org.apache.carbondata.format'") val testData = s"$resourcesPath/badrecords/dummy.csv" @@ -132,7 +132,8 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { } finally { CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); @@ -148,9 +149,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE"); + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") sql("create table data_bmf(name String, dob long, weight int) " + "STORED BY 'org.apache.carbondata.format'") val testData = s"$resourcesPath/badrecords/dummy.csv" @@ -166,10 +167,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { } finally { CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) } } @@ -182,7 +184,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT") sql("create table data_bm_no_good_data(name String, dob long, weight int) " + "STORED BY 'org.apache.carbondata.format'") val testData = s"$resourcesPath/badrecords/dummy2.csv" @@ -198,10 +200,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { } finally { CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false"); + .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) } } @@ -214,7 +217,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL") sql("create table data_tbm(name String, dob long, weight int) " + "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " + "'bucketcolumns'='name', 'tableName'='data_tbm')") @@ -232,7 +235,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { finally { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index d8f84bf..3a2e2eb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -171,7 +171,15 @@ public class CarbonLoadModel implements Serializable { */ private boolean preFetch; - private String numberOfcolumns; + /** + * Batch sort should be enabled or not + */ + private String sortScope; + + /** + * Batch sort size in mb. + */ + private String batchSortSizeInMb; /** * get escape char * @@ -391,6 +399,8 @@ public class CarbonLoadModel implements Serializable { copy.dictionaryServerPort = dictionaryServerPort; copy.preFetch = preFetch; copy.isEmptyDataBadRecord = isEmptyDataBadRecord; + copy.sortScope = sortScope; + copy.batchSortSizeInMb = batchSortSizeInMb; return copy; } @@ -442,6 +452,8 @@ public class CarbonLoadModel implements Serializable { copyObj.dictionaryServerPort = dictionaryServerPort; copyObj.preFetch = preFetch; copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord; + copyObj.sortScope = sortScope; + copyObj.batchSortSizeInMb = batchSortSizeInMb; return copyObj; } @@ -773,4 +785,20 @@ public class CarbonLoadModel implements Serializable { public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) { this.isEmptyDataBadRecord = isEmptyDataBadRecord; } + + public String getSortScope() { + return sortScope; + } + + public void setSortScope(String sortScope) { + this.sortScope = sortScope; + } + + public String getBatchSortSizeInMb() { + return batchSortSizeInMb; + } + + public void setBatchSortSizeInMb(String batchSortSizeInMb) { + this.batchSortSizeInMb = batchSortSizeInMb; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 8865518..5c7c035 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -35,6 +35,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.model.CarbonLoadModel; import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.sort.SortScopeOptions; +import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl; import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl; @@ -53,14 +55,12 @@ public final class DataLoadProcessBuilder { public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation, CarbonIterator[] inputIterators) throws Exception { - boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, - CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)); CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); + SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); if (configuration.getBucketingInfo() != null) { return buildInternalForBucketing(inputIterators, configuration); - } else if (batchSort) { + } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { return buildInternalForBatchSort(inputIterators, configuration); } else { return buildInternal(inputIterators, configuration); @@ -158,6 +158,10 @@ public final class DataLoadProcessBuilder { loadModel.getIsEmptyDataBadRecord().split(",")[1]); configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH, loadModel.getFactFilePath()); + configuration + .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope()); + configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + loadModel.getBatchSortSizeInMb()); CarbonMetadata.getInstance().addCarbonTable(carbonTable); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java new file mode 100644 index 0000000..f2534db --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.newflow.sort; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; + +/** + * Sort scope options + */ +public class SortScopeOptions { + + public static SortScope getSortScope(String sortScope) { + if (sortScope == null) { + sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT; + } + switch (sortScope.toUpperCase()) { + case "BATCH_SORT": + return SortScope.BATCH_SORT; + case "LOCAL_SORT": + return SortScope.LOCAL_SORT; + case "NO_SORT": + return SortScope.NO_SORT; + default: + return SortScope.LOCAL_SORT; + } + } + + public static boolean isValidSortOption(String sortScope) { + if (sortScope == null) { + return false; + } + switch (sortScope.toUpperCase()) { + case "BATCH_SORT": + return true; + case "LOCAL_SORT": + return true; + case "NO_SORT": + return true; + default: + return false; + } + } + + public enum SortScope { + NO_SORT, BATCH_SORT, LOCAL_SORT; + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java index 60cca69..39a21ad 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java @@ -29,6 +29,7 @@ import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorte import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl; import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl; import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; public class SorterFactory { @@ -39,9 +40,7 @@ public class SorterFactory { boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)); - boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, - CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)); + SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); Sorter sorter; if (offheapsort) { if (configuration.getBucketingInfo() != null) { @@ -58,7 +57,7 @@ public class SorterFactory { sorter = new ParallelReadMergeSorterImpl(counter); } } - if (batchSort) { + if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { if (configuration.getBucketingInfo() == null) { sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index df3825a..898b73d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -98,9 +98,10 @@ public class UnsafeSortDataRows { .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT, CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT)); - this.maxSizeAllowed = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0")); + this.maxSizeAllowed = parameters.getBatchSortSizeinMb(); if (maxSizeAllowed <= 0) { + // If user does not input any memory size, then take half the size of usable memory configured + // in sort memory size. this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2; } else { this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index 3c3a9d8..07149f7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -114,6 +114,8 @@ public class SortParameters { private int numberOfCores; + private int batchSortSizeinMb; + public SortParameters getCopy() { SortParameters parameters = new SortParameters(); parameters.tempFileLocation = tempFileLocation; @@ -138,6 +140,7 @@ public class SortParameters { parameters.taskNo = taskNo; parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; parameters.numberOfCores = numberOfCores; + parameters.batchSortSizeinMb = batchSortSizeinMb; return parameters; } @@ -317,6 +320,14 @@ public class SortParameters { this.numberOfCores = numberOfCores; } + public int getBatchSortSizeinMb() { + return batchSortSizeinMb; + } + + public void setBatchSortSizeinMb(int batchSortSizeinMb) { + this.batchSortSizeinMb = batchSortSizeinMb; + } + public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) { SortParameters parameters = new SortParameters(); CarbonTableIdentifier tableIdentifier = @@ -334,6 +345,8 @@ public class SortParameters { parameters.setComplexDimColCount(configuration.getComplexDimensionCount()); parameters.setNoDictionaryDimnesionColumn( CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields())); + parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration)); + parameters.setObserver(new SortObserver()); // get sort buffer size parameters.setSortBufferSize(Integer.parseInt(carbonProperties http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 41bfbed..a4de24e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -56,8 +56,10 @@ import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.datatypes.PrimitiveDataType; import org.apache.carbondata.processing.datatypes.StructDataType; import org.apache.carbondata.processing.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.sort.SortScopeOptions; import org.apache.commons.lang3.ArrayUtils; @@ -522,4 +524,53 @@ public final class CarbonDataProcessorUtil { return aggType; } + /** + * Check whether batch sort is enabled or not. + * @param configuration + * @return + */ + public static SortScopeOptions.SortScope getSortScope(CarbonDataLoadConfiguration configuration) { + SortScopeOptions.SortScope sortScope; + try { + // first check whether user input it from ddl, otherwise get from carbon properties + if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) == null) { + sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)); + } else { + sortScope = SortScopeOptions.getSortScope( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) + .toString()); + } + } catch (Exception e) { + sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT); + LOGGER.warn("sort scope is set to " + sortScope); + } + return sortScope; + } + + /** + * Get the batch sort size + * @param configuration + * @return + */ + public static int getBatchSortSizeinMb(CarbonDataLoadConfiguration configuration) { + int batchSortSizeInMb; + try { + // First try get from user input from ddl , otherwise get from carbon properties. + if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB) + == null) { + batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0")); + } else { + batchSortSizeInMb = Integer.parseInt( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB) + .toString()); + } + } catch (Exception e) { + batchSortSizeInMb = 0; + } + return batchSortSizeInMb; + } + } \ No newline at end of file