[CARBONDATA-2513][32K] Support write long string from dataframe support write long string from dataframe
This closes #2382 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55f4bc6c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55f4bc6c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55f4bc6c Branch: refs/heads/carbonstore Commit: 55f4bc6c89f637b162b414033512901e9bd8a745 Parents: 218a8de Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Wed Jun 20 19:01:24 2018 +0800 Committer: kumarvishal09 <kumarvishal1...@gmail.com> Committed: Thu Jun 21 12:31:21 2018 +0530 ---------------------------------------------------------------------- .../VarcharDataTypesBasicTestCase.scala | 32 +++++++++++++++++++- .../apache/carbondata/spark/CarbonOption.scala | 2 ++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 15 +++++++-- .../carbondata/spark/util/CarbonScalaUtil.scala | 3 +- .../spark/sql/CarbonDataFrameWriter.scala | 1 + .../streaming/parser/FieldConverter.scala | 11 +++++-- 6 files changed, 57 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala index 9ea3f1f..9798178 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala @@ -20,8 +20,9 @@ package org.apache.carbondata.spark.testsuite.longstring import java.io.{File, PrintWriter} import org.apache.commons.lang3.RandomStringUtils -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -36,6 +37,7 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page" private val lineNum = 1000 private var content: Content = _ + private var longStringDF: DataFrame = _ private var originMemorySize = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT) @@ -257,6 +259,34 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi // since after exception wrapper, we cannot get the root cause directly } + private def prepareDF(): Unit = { + val schema = StructType( + StructField("id", IntegerType, nullable = true) :: + StructField("name", StringType, nullable = true) :: + StructField("description", StringType, nullable = true) :: + StructField("address", StringType, nullable = true) :: + StructField("note", StringType, nullable = true) :: Nil + ) + longStringDF = sqlContext.sparkSession.read + .schema(schema) + .csv(inputFile) + } + + test("write from dataframe with long string datatype") { + prepareDF() + // write spark dataframe to carbondata with `long_string_columns` property + longStringDF.write + .format("carbondata") + .option("tableName", longStringTable) + .option("single_pass", "false") + .option("sort_columns", "name") + .option("long_string_columns", "description, note") + .mode(SaveMode.Overwrite) + .save() + + checkQuery() + } + // will create 2 long string columns private def createFile(filePath: String, line: Int = 10000, start: Int = 0, varcharLen: Int = Short.MaxValue + 1000): Content = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index a48e63d..5f23f77 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -48,6 +48,8 @@ class CarbonOption(options: Map[String, String]) { def dictionaryExclude: Option[String] = options.get("dictionary_exclude") + def longStringColumns: Option[String] = options.get("long_string_columns") + def tableBlockSize: Option[String] = options.get("table_blocksize") def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 4bfdd3b..5ed39fa 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -447,6 +448,10 @@ class NewRddIterator(rddIter: Iterator[Row], private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 private val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + import scala.collection.JavaConverters._ + private val isVarcharTypeMapping = + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn( + carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR) def hasNext: Boolean = rddIter.hasNext def next: Array[AnyRef] = { @@ -454,7 +459,8 @@ class NewRddIterator(rddIter: Iterator[Row], val columns = new Array[AnyRef](row.length) for (i <- 0 until columns.length) { columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat, - delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat) + delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat, + isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i)) } columns } @@ -491,6 +497,10 @@ class LazyRddIterator(serializer: SerializerInstance, private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 private val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + import scala.collection.JavaConverters._ + private val isVarcharTypeMapping = + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn( + carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR) private var rddIter: Iterator[Row] = null private var uninitialized = true @@ -514,7 +524,8 @@ class LazyRddIterator(serializer: SerializerInstance, val columns = new Array[AnyRef](row.length) for (i <- 0 until columns.length) { columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat, - delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat) + delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat, + isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i)) } columns } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 3e94a66..44d3cca 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -127,9 +127,10 @@ object CarbonScalaUtil { delimiterLevel2: String, timeStampFormat: SimpleDateFormat, dateFormat: SimpleDateFormat, + isVarcharType: Boolean = false, level: Int = 1): String = { FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1, - delimiterLevel2, timeStampFormat, dateFormat, level) + delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 67817c0..c81622e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -86,6 +86,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { "SORT_COLUMNS" -> options.sortColumns, "DICTIONARY_INCLUDE" -> options.dictionaryInclude, "DICTIONARY_EXCLUDE" -> options.dictionaryExclude, + "LONG_STRING_COLUMNS" -> options.longStringColumns, "TABLE_BLOCKSIZE" -> options.tableBlockSize, "STREAMING" -> Option(options.isStreaming.toString) ).filter(_._2.isDefined) http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala index 8661417..e167d46 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala @@ -32,6 +32,7 @@ object FieldConverter { * @param delimiterLevel2 level 2 delimiter for complex type * @param timeStampFormat timestamp format * @param dateFormat date format + * @param isVarcharType whether it is varchar type. A varchar type has no string length limit * @param level level for recursive call */ def objectToString( @@ -41,12 +42,14 @@ object FieldConverter { delimiterLevel2: String, timeStampFormat: SimpleDateFormat, dateFormat: SimpleDateFormat, + isVarcharType: Boolean = false, level: Int = 1): String = { if (value == null) { serializationNullFormat } else { value match { - case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { + case s: String => if (!isVarcharType && + s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { throw new Exception("Dataload failed, String length cannot exceed " + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters") } else { @@ -71,7 +74,8 @@ object FieldConverter { val builder = new StringBuilder() s.foreach { x => builder.append(objectToString(x, serializationNullFormat, delimiterLevel1, - delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) + delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1)) + .append(delimiter) } builder.substring(0, builder.length - delimiter.length()) case m: scala.collection.Map[Any, Any] => @@ -85,7 +89,8 @@ object FieldConverter { val builder = new StringBuilder() for (i <- 0 until r.length) { builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1, - delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) + delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1)) + .append(delimiter) } builder.substring(0, builder.length - delimiter.length()) case other => other.toString